Compare commits
	
		
			No commits in common. "0c425be8091eb46ab4a38bfb605cb89644794a1e" and "87f6bf8e211258d6f620b401371ba3c77582db4d" have entirely different histories.
		
	
	
		
			0c425be809
			...
			87f6bf8e21
		
	
		
							
								
								
									
										159
									
								
								campatmo.py
								
								
								
								
							
							
						
						
									
										159
									
								
								campatmo.py
								
								
								
								
							|  | @ -3,8 +3,6 @@ import time | |||
| import sys | ||||
| import random | ||||
| from glob import glob | ||||
| from threading import Thread | ||||
| from typing import Union, List, Mapping | ||||
| from pygame import mixer | ||||
| 
 | ||||
| from atmorepl import AtmoReplRunner | ||||
|  | @ -12,110 +10,18 @@ from atmorepl import AtmoReplRunner | |||
| VERBOSITY = 0 | ||||
| 
 | ||||
| 
 | ||||
| class SoundCollection: | ||||
|     def __init__( | ||||
|         self, name, files: Union[List[str], Mapping[str, List[str]]], volume=0.5 | ||||
|     ): | ||||
|         self.name = name | ||||
|         self.topics = files.keys() if hasattr(files, "keys") else [] | ||||
|         if self.topics: | ||||
|             self.files = files | ||||
|         else: | ||||
|             self.files = {"None": files} | ||||
|         self.flat_files = [f for topic in self.files for f in self.files[topic]] | ||||
|         self.volume = volume | ||||
|         self.cur_topic = None | ||||
|         self.load_all_files() | ||||
|         self.sound_indices = {name: 0 for name in self.sounds} | ||||
|         self.flat_sound_index = 0 | ||||
| 
 | ||||
|     def load_all_files(self): | ||||
|         self.sounds = {} | ||||
|         if self.topics: | ||||
|             for t in self.topics: | ||||
|                 self.load_topic_files(t) | ||||
|         else: | ||||
|             self.load_topic_files("None") | ||||
|         self.flat_sounds = [s for topic in self.sounds for s in self.sounds[topic]] | ||||
|         random.shuffle(self.flat_sounds) | ||||
| 
 | ||||
|     def load_topic_files(self, topic): | ||||
|         self.sounds[topic] = [] | ||||
|         for f in self.files[topic]: | ||||
|             sound = mixer.Sound(f) | ||||
|             sound.set_volume(self.volume) | ||||
|             self.sounds[topic].append(sound) | ||||
|         random.shuffle(self.sounds[topic]) | ||||
| 
 | ||||
|     def next(self): | ||||
|         if self.cur_topic is None: | ||||
|             if len(self.flat_sounds) == 0: | ||||
|                 return | ||||
|             self.flat_sound_index += 1 | ||||
|             if self.flat_sound_index >= len(self.flat_sounds): | ||||
|                 random.shuffle(self.flat_sounds) | ||||
|                 self.flat_sound_index = 0 | ||||
|             return self.flat_sounds[self.flat_sound_index] | ||||
|         else: | ||||
|             if len(self.sounds[self.cur_topic]) == 0: | ||||
|                 return | ||||
|             self.sound_indices[self.cur_topic] += 1 | ||||
|             if self.sound_indices[self.cur_topic] >= len(self.sounds[self.cur_topic]): | ||||
|                 random.shuffle(self.sounds[self.cur_topic]) | ||||
|                 self.sound_indices[self.cur_topic] = 0 | ||||
|             return self.sounds[self.cur_topic][self.sound_indices[self.cur_topic]] | ||||
| 
 | ||||
|     def set_topic(self, topic=None): | ||||
|         if self.topics: | ||||
|             self.cur_topic = topic | ||||
| 
 | ||||
|     def __len__(self): | ||||
|         if self.cur_topic is None: | ||||
|             return len(self.flat_sounds) | ||||
|         else: | ||||
|             return len(self.sounds[self.cur_topic]) | ||||
| 
 | ||||
| 
 | ||||
| class CampAtmo: | ||||
|     tracks_per_type = 2 | ||||
|     single_track_types = ["lines"] | ||||
|     background_track_types = ["background", "buzzwords"] | ||||
|     lines_delta_sec = (5, 15) | ||||
|     line_reaction_wait_sec = 0.8 | ||||
|     sound_volume = 0.5 | ||||
| 
 | ||||
|     def __init__(self): | ||||
|         self.stopped = False | ||||
|         mixer.init(frequency=22050 * 2) | ||||
|         self.load_sounds() | ||||
| 
 | ||||
|     def start(self): | ||||
|         self.background_thread = Thread( | ||||
|             None, self.run_forever, "atmo-background", daemon=True | ||||
|         ) | ||||
|         self.background_thread.start() | ||||
|         self.lines_thread = Thread( | ||||
|             None, self.run_lines_forever, "atmo-lines", daemon=True | ||||
|         ) | ||||
|         self.lines_thread.start() | ||||
| 
 | ||||
|     def stop(): | ||||
|         self.stopped = True | ||||
|         self.lines_thread.join(2) | ||||
|         self.background_thread.join(0.5) | ||||
| 
 | ||||
|     def load_sounds(self): | ||||
|         type_names = glob("samples/*/") | ||||
|         type_files = {} | ||||
|         for type_dir in type_names: | ||||
|             type_name = type_dir.split("/")[1] | ||||
|             flat_files = glob(type_dir + "/*.ogg") | ||||
|             type_files[type_name] = flat_files | ||||
|             topic_files = glob(type_dir + "/*/*.ogg") | ||||
|             if topic_files: | ||||
|                 type_files[type_name] = {} | ||||
|                 for topic_dir in glob(type_dir + "/*/"): | ||||
|                     topic_name = topic_dir.split("/")[2] | ||||
|                     type_files[type_name][topic_name] = glob(type_dir + "/*/*.ogg") | ||||
|         type_files = {c.split("/")[1]: glob(c + "/*.ogg") for c in type_names} | ||||
|         self.types = {} | ||||
|         i = 0 | ||||
|         mixer.set_num_channels( | ||||
|  | @ -136,60 +42,51 @@ class CampAtmo: | |||
|                 i += 1 | ||||
|         self.sounds = {} | ||||
|         for name, files in type_files.items(): | ||||
|             self.sounds[name] = SoundCollection( | ||||
|                 name, files, volume=0.1 if name in self.background_track_types else 0.9 | ||||
|             ) | ||||
|             self.sounds[name] = [] | ||||
|             for f in files: | ||||
|                 sound = mixer.Sound(f) | ||||
|                 sound.set_volume(self.sound_volume) | ||||
|                 self.sounds[name].append(sound) | ||||
|             random.shuffle(self.sounds[name]) | ||||
| 
 | ||||
|     def manage_background_queue(self, name, type_channels): | ||||
|     def manage_type_queue(self, name, type_channels): | ||||
|         try: | ||||
|             next_sound_index = max( | ||||
|                 [ | ||||
|                     self.sounds[name].index(tt.get_queue() or tt.get_sound()) | ||||
|                     for tt in type_channels | ||||
|                 ] | ||||
|             ) | ||||
|         except ValueError: | ||||
|             next_sound_index = 0 | ||||
|         if next_sound_index >= len(self.sounds[name]): | ||||
|             random.shuffle(self.sounds[name]) | ||||
|             next_sound_index = 0 | ||||
|         for c in type_channels: | ||||
|             if c.get_queue() is not None and len(self.sounds[name]) > 1: | ||||
|                 continue | ||||
|             v(name) | ||||
|             c.queue(self.sounds[name].next()) | ||||
|             c.queue(self.sounds[name][next_sound_index]) | ||||
|             if c.get_queue() is None and len(self.sounds[name]) > 1: | ||||
|                 c.queue(self.sounds[name].next()) | ||||
|                 c.queue(self.sounds[name][next_sound_index + 1]) | ||||
| 
 | ||||
|     def run_forever(self): | ||||
|         while not self.stopped: | ||||
|         while True: | ||||
|             for name, type_channels in self.types.items(): | ||||
|                 if ( | ||||
|                     name not in self.background_track_types | ||||
|                     or len(self.sounds[name]) == 0 | ||||
|                 ): | ||||
|                 if len(self.sounds[name]) == 0 or name in self.single_track_types: | ||||
|                     continue | ||||
|                 self.manage_background_queue(name, type_channels) | ||||
|                 self.manage_type_queue(name, type_channels) | ||||
|             time.sleep(1) | ||||
|             self.load_sounds() | ||||
| 
 | ||||
|     def play_type_sounds(self, type_name) -> int: | ||||
|         if len(self.sounds[type_name]) == 0: | ||||
|             return | ||||
|         max_sound_length = 0 | ||||
|         for channel in self.types[type_name]: | ||||
|             sound = self.sounds[type_name].next() | ||||
|             max_sound_length = max(max_sound_length, sound.get_length()) | ||||
|             channel.queue(sound) | ||||
|         return max_sound_length | ||||
| 
 | ||||
|     def run_lines_forever(self): | ||||
|         while not self.stopped: | ||||
|             time.sleep(random.randrange(*self.lines_delta_sec)) | ||||
|             length_sec = self.play_type_sounds("lines") | ||||
|             time.sleep(length_sec + self.line_reaction_wait_sec) | ||||
|             self.play_type_sounds("reactions") | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|     atmo = CampAtmo() | ||||
|     atmoRepl = AtmoReplRunner(atmo, port=7723) | ||||
|     atmoRepl.start() | ||||
|     atmo.start() | ||||
|     try: | ||||
|         while True: | ||||
|             time.sleep(100) | ||||
|         atmo.run_forever() | ||||
|     except KeyboardInterrupt: | ||||
|         print("exiting...") | ||||
|         atmo.stop() | ||||
|         print("exit") | ||||
|         sys.exit(0) | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue