Compare commits
No commits in common. "0c425be8091eb46ab4a38bfb605cb89644794a1e" and "87f6bf8e211258d6f620b401371ba3c77582db4d" have entirely different histories.
0c425be809
...
87f6bf8e21
159
campatmo.py
159
campatmo.py
|
@ -3,8 +3,6 @@ import time
|
|||
import sys
|
||||
import random
|
||||
from glob import glob
|
||||
from threading import Thread
|
||||
from typing import Union, List, Mapping
|
||||
from pygame import mixer
|
||||
|
||||
from atmorepl import AtmoReplRunner
|
||||
|
@ -12,110 +10,18 @@ from atmorepl import AtmoReplRunner
|
|||
VERBOSITY = 0
|
||||
|
||||
|
||||
class SoundCollection:
|
||||
def __init__(
|
||||
self, name, files: Union[List[str], Mapping[str, List[str]]], volume=0.5
|
||||
):
|
||||
self.name = name
|
||||
self.topics = files.keys() if hasattr(files, "keys") else []
|
||||
if self.topics:
|
||||
self.files = files
|
||||
else:
|
||||
self.files = {"None": files}
|
||||
self.flat_files = [f for topic in self.files for f in self.files[topic]]
|
||||
self.volume = volume
|
||||
self.cur_topic = None
|
||||
self.load_all_files()
|
||||
self.sound_indices = {name: 0 for name in self.sounds}
|
||||
self.flat_sound_index = 0
|
||||
|
||||
def load_all_files(self):
|
||||
self.sounds = {}
|
||||
if self.topics:
|
||||
for t in self.topics:
|
||||
self.load_topic_files(t)
|
||||
else:
|
||||
self.load_topic_files("None")
|
||||
self.flat_sounds = [s for topic in self.sounds for s in self.sounds[topic]]
|
||||
random.shuffle(self.flat_sounds)
|
||||
|
||||
def load_topic_files(self, topic):
|
||||
self.sounds[topic] = []
|
||||
for f in self.files[topic]:
|
||||
sound = mixer.Sound(f)
|
||||
sound.set_volume(self.volume)
|
||||
self.sounds[topic].append(sound)
|
||||
random.shuffle(self.sounds[topic])
|
||||
|
||||
def next(self):
|
||||
if self.cur_topic is None:
|
||||
if len(self.flat_sounds) == 0:
|
||||
return
|
||||
self.flat_sound_index += 1
|
||||
if self.flat_sound_index >= len(self.flat_sounds):
|
||||
random.shuffle(self.flat_sounds)
|
||||
self.flat_sound_index = 0
|
||||
return self.flat_sounds[self.flat_sound_index]
|
||||
else:
|
||||
if len(self.sounds[self.cur_topic]) == 0:
|
||||
return
|
||||
self.sound_indices[self.cur_topic] += 1
|
||||
if self.sound_indices[self.cur_topic] >= len(self.sounds[self.cur_topic]):
|
||||
random.shuffle(self.sounds[self.cur_topic])
|
||||
self.sound_indices[self.cur_topic] = 0
|
||||
return self.sounds[self.cur_topic][self.sound_indices[self.cur_topic]]
|
||||
|
||||
def set_topic(self, topic=None):
|
||||
if self.topics:
|
||||
self.cur_topic = topic
|
||||
|
||||
def __len__(self):
|
||||
if self.cur_topic is None:
|
||||
return len(self.flat_sounds)
|
||||
else:
|
||||
return len(self.sounds[self.cur_topic])
|
||||
|
||||
|
||||
class CampAtmo:
|
||||
tracks_per_type = 2
|
||||
single_track_types = ["lines"]
|
||||
background_track_types = ["background", "buzzwords"]
|
||||
lines_delta_sec = (5, 15)
|
||||
line_reaction_wait_sec = 0.8
|
||||
sound_volume = 0.5
|
||||
|
||||
def __init__(self):
|
||||
self.stopped = False
|
||||
mixer.init(frequency=22050 * 2)
|
||||
self.load_sounds()
|
||||
|
||||
def start(self):
|
||||
self.background_thread = Thread(
|
||||
None, self.run_forever, "atmo-background", daemon=True
|
||||
)
|
||||
self.background_thread.start()
|
||||
self.lines_thread = Thread(
|
||||
None, self.run_lines_forever, "atmo-lines", daemon=True
|
||||
)
|
||||
self.lines_thread.start()
|
||||
|
||||
def stop():
|
||||
self.stopped = True
|
||||
self.lines_thread.join(2)
|
||||
self.background_thread.join(0.5)
|
||||
|
||||
def load_sounds(self):
|
||||
type_names = glob("samples/*/")
|
||||
type_files = {}
|
||||
for type_dir in type_names:
|
||||
type_name = type_dir.split("/")[1]
|
||||
flat_files = glob(type_dir + "/*.ogg")
|
||||
type_files[type_name] = flat_files
|
||||
topic_files = glob(type_dir + "/*/*.ogg")
|
||||
if topic_files:
|
||||
type_files[type_name] = {}
|
||||
for topic_dir in glob(type_dir + "/*/"):
|
||||
topic_name = topic_dir.split("/")[2]
|
||||
type_files[type_name][topic_name] = glob(type_dir + "/*/*.ogg")
|
||||
type_files = {c.split("/")[1]: glob(c + "/*.ogg") for c in type_names}
|
||||
self.types = {}
|
||||
i = 0
|
||||
mixer.set_num_channels(
|
||||
|
@ -136,60 +42,51 @@ class CampAtmo:
|
|||
i += 1
|
||||
self.sounds = {}
|
||||
for name, files in type_files.items():
|
||||
self.sounds[name] = SoundCollection(
|
||||
name, files, volume=0.1 if name in self.background_track_types else 0.9
|
||||
)
|
||||
self.sounds[name] = []
|
||||
for f in files:
|
||||
sound = mixer.Sound(f)
|
||||
sound.set_volume(self.sound_volume)
|
||||
self.sounds[name].append(sound)
|
||||
random.shuffle(self.sounds[name])
|
||||
|
||||
def manage_background_queue(self, name, type_channels):
|
||||
def manage_type_queue(self, name, type_channels):
|
||||
try:
|
||||
next_sound_index = max(
|
||||
[
|
||||
self.sounds[name].index(tt.get_queue() or tt.get_sound())
|
||||
for tt in type_channels
|
||||
]
|
||||
)
|
||||
except ValueError:
|
||||
next_sound_index = 0
|
||||
if next_sound_index >= len(self.sounds[name]):
|
||||
random.shuffle(self.sounds[name])
|
||||
next_sound_index = 0
|
||||
for c in type_channels:
|
||||
if c.get_queue() is not None and len(self.sounds[name]) > 1:
|
||||
continue
|
||||
v(name)
|
||||
c.queue(self.sounds[name].next())
|
||||
c.queue(self.sounds[name][next_sound_index])
|
||||
if c.get_queue() is None and len(self.sounds[name]) > 1:
|
||||
c.queue(self.sounds[name].next())
|
||||
c.queue(self.sounds[name][next_sound_index + 1])
|
||||
|
||||
def run_forever(self):
|
||||
while not self.stopped:
|
||||
while True:
|
||||
for name, type_channels in self.types.items():
|
||||
if (
|
||||
name not in self.background_track_types
|
||||
or len(self.sounds[name]) == 0
|
||||
):
|
||||
if len(self.sounds[name]) == 0 or name in self.single_track_types:
|
||||
continue
|
||||
self.manage_background_queue(name, type_channels)
|
||||
self.manage_type_queue(name, type_channels)
|
||||
time.sleep(1)
|
||||
self.load_sounds()
|
||||
|
||||
def play_type_sounds(self, type_name) -> int:
|
||||
if len(self.sounds[type_name]) == 0:
|
||||
return
|
||||
max_sound_length = 0
|
||||
for channel in self.types[type_name]:
|
||||
sound = self.sounds[type_name].next()
|
||||
max_sound_length = max(max_sound_length, sound.get_length())
|
||||
channel.queue(sound)
|
||||
return max_sound_length
|
||||
|
||||
def run_lines_forever(self):
|
||||
while not self.stopped:
|
||||
time.sleep(random.randrange(*self.lines_delta_sec))
|
||||
length_sec = self.play_type_sounds("lines")
|
||||
time.sleep(length_sec + self.line_reaction_wait_sec)
|
||||
self.play_type_sounds("reactions")
|
||||
|
||||
|
||||
def main():
|
||||
atmo = CampAtmo()
|
||||
atmoRepl = AtmoReplRunner(atmo, port=7723)
|
||||
atmoRepl.start()
|
||||
atmo.start()
|
||||
try:
|
||||
while True:
|
||||
time.sleep(100)
|
||||
atmo.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("exiting...")
|
||||
atmo.stop()
|
||||
print("exit")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue