Compare commits

..

2 Commits

Author SHA1 Message Date
Jakob 0c425be809 Add lines playback and put lines+bg into threads 2019-08-20 16:43:47 +02:00
Jakob 40eca4c369 Add SoundCollection class with topics 2019-08-20 16:16:26 +02:00
2 changed files with 131 additions and 28 deletions

View File

@ -3,6 +3,8 @@ import time
import sys import sys
import random import random
from glob import glob from glob import glob
from threading import Thread
from typing import Union, List, Mapping
from pygame import mixer from pygame import mixer
from atmorepl import AtmoReplRunner from atmorepl import AtmoReplRunner
@ -10,18 +12,110 @@ from atmorepl import AtmoReplRunner
VERBOSITY = 0 VERBOSITY = 0
class SoundCollection:
def __init__(
self, name, files: Union[List[str], Mapping[str, List[str]]], volume=0.5
):
self.name = name
self.topics = files.keys() if hasattr(files, "keys") else []
if self.topics:
self.files = files
else:
self.files = {"None": files}
self.flat_files = [f for topic in self.files for f in self.files[topic]]
self.volume = volume
self.cur_topic = None
self.load_all_files()
self.sound_indices = {name: 0 for name in self.sounds}
self.flat_sound_index = 0
def load_all_files(self):
self.sounds = {}
if self.topics:
for t in self.topics:
self.load_topic_files(t)
else:
self.load_topic_files("None")
self.flat_sounds = [s for topic in self.sounds for s in self.sounds[topic]]
random.shuffle(self.flat_sounds)
def load_topic_files(self, topic):
self.sounds[topic] = []
for f in self.files[topic]:
sound = mixer.Sound(f)
sound.set_volume(self.volume)
self.sounds[topic].append(sound)
random.shuffle(self.sounds[topic])
def next(self):
if self.cur_topic is None:
if len(self.flat_sounds) == 0:
return
self.flat_sound_index += 1
if self.flat_sound_index >= len(self.flat_sounds):
random.shuffle(self.flat_sounds)
self.flat_sound_index = 0
return self.flat_sounds[self.flat_sound_index]
else:
if len(self.sounds[self.cur_topic]) == 0:
return
self.sound_indices[self.cur_topic] += 1
if self.sound_indices[self.cur_topic] >= len(self.sounds[self.cur_topic]):
random.shuffle(self.sounds[self.cur_topic])
self.sound_indices[self.cur_topic] = 0
return self.sounds[self.cur_topic][self.sound_indices[self.cur_topic]]
def set_topic(self, topic=None):
if self.topics:
self.cur_topic = topic
def __len__(self):
if self.cur_topic is None:
return len(self.flat_sounds)
else:
return len(self.sounds[self.cur_topic])
class CampAtmo: class CampAtmo:
tracks_per_type = 2 tracks_per_type = 2
single_track_types = ["lines"] single_track_types = ["lines"]
sound_volume = 0.5 background_track_types = ["background", "buzzwords"]
lines_delta_sec = (5, 15)
line_reaction_wait_sec = 0.8
def __init__(self): def __init__(self):
self.stopped = False
mixer.init(frequency=22050 * 2) mixer.init(frequency=22050 * 2)
self.load_sounds() self.load_sounds()
def start(self):
self.background_thread = Thread(
None, self.run_forever, "atmo-background", daemon=True
)
self.background_thread.start()
self.lines_thread = Thread(
None, self.run_lines_forever, "atmo-lines", daemon=True
)
self.lines_thread.start()
def stop():
self.stopped = True
self.lines_thread.join(2)
self.background_thread.join(0.5)
def load_sounds(self): def load_sounds(self):
type_names = glob("samples/*/") type_names = glob("samples/*/")
type_files = {c.split("/")[1]: glob(c + "/*.ogg") for c in type_names} type_files = {}
for type_dir in type_names:
type_name = type_dir.split("/")[1]
flat_files = glob(type_dir + "/*.ogg")
type_files[type_name] = flat_files
topic_files = glob(type_dir + "/*/*.ogg")
if topic_files:
type_files[type_name] = {}
for topic_dir in glob(type_dir + "/*/"):
topic_name = topic_dir.split("/")[2]
type_files[type_name][topic_name] = glob(type_dir + "/*/*.ogg")
self.types = {} self.types = {}
i = 0 i = 0
mixer.set_num_channels( mixer.set_num_channels(
@ -42,51 +136,60 @@ class CampAtmo:
i += 1 i += 1
self.sounds = {} self.sounds = {}
for name, files in type_files.items(): for name, files in type_files.items():
self.sounds[name] = [] self.sounds[name] = SoundCollection(
for f in files: name, files, volume=0.1 if name in self.background_track_types else 0.9
sound = mixer.Sound(f)
sound.set_volume(self.sound_volume)
self.sounds[name].append(sound)
random.shuffle(self.sounds[name])
def manage_type_queue(self, name, type_channels):
try:
next_sound_index = max(
[
self.sounds[name].index(tt.get_queue() or tt.get_sound())
for tt in type_channels
]
) )
except ValueError:
next_sound_index = 0 def manage_background_queue(self, name, type_channels):
if next_sound_index >= len(self.sounds[name]):
random.shuffle(self.sounds[name])
next_sound_index = 0
for c in type_channels: for c in type_channels:
if c.get_queue() is not None and len(self.sounds[name]) > 1: if c.get_queue() is not None and len(self.sounds[name]) > 1:
continue continue
c.queue(self.sounds[name][next_sound_index]) v(name)
c.queue(self.sounds[name].next())
if c.get_queue() is None and len(self.sounds[name]) > 1: if c.get_queue() is None and len(self.sounds[name]) > 1:
c.queue(self.sounds[name][next_sound_index + 1]) c.queue(self.sounds[name].next())
def run_forever(self): def run_forever(self):
while True: while not self.stopped:
for name, type_channels in self.types.items(): for name, type_channels in self.types.items():
if len(self.sounds[name]) == 0 or name in self.single_track_types: if (
name not in self.background_track_types
or len(self.sounds[name]) == 0
):
continue continue
self.manage_type_queue(name, type_channels) self.manage_background_queue(name, type_channels)
time.sleep(1) time.sleep(1)
self.load_sounds() self.load_sounds()
def play_type_sounds(self, type_name) -> int:
if len(self.sounds[type_name]) == 0:
return
max_sound_length = 0
for channel in self.types[type_name]:
sound = self.sounds[type_name].next()
max_sound_length = max(max_sound_length, sound.get_length())
channel.queue(sound)
return max_sound_length
def run_lines_forever(self):
while not self.stopped:
time.sleep(random.randrange(*self.lines_delta_sec))
length_sec = self.play_type_sounds("lines")
time.sleep(length_sec + self.line_reaction_wait_sec)
self.play_type_sounds("reactions")
def main(): def main():
atmo = CampAtmo() atmo = CampAtmo()
atmoRepl = AtmoReplRunner(atmo, port=7723) atmoRepl = AtmoReplRunner(atmo, port=7723)
atmoRepl.start() atmoRepl.start()
atmo.start()
try: try:
atmo.run_forever() while True:
time.sleep(100)
except KeyboardInterrupt: except KeyboardInterrupt:
print("exit") print("exiting...")
atmo.stop()
sys.exit(0) sys.exit(0)