import os import time from random import shuffle from dataclasses import dataclass, field os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "hide" from pygame import mixer from typing import Union, Tuple, List from dirs import list_files_relative @dataclass class Sample: path: str abspath: str mtime: float size: int sound: mixer.Sound old_sounds: List[Tuple[mixer.Sound, float]] = field(default_factory=list) class SampleCollection: extensions = ["ogg"] def __init__(self, sample_dir: str, volume: float = 0.7): self.db = [] self.volume = volume self.topic_db = {} self.sample_dir = sample_dir self.index = 0 self.load() def __len__(self): return len(self.db) def load(self): prev_prefix = "" for path, stat in list_files_relative( self.sample_dir, extensions=self.extensions ): db_sample = self.get_sample(path) if ( db_sample is not None and db_sample.mtime == int(stat.st_mtime) and db_sample.size == stat.st_size ): continue abspath = os.path.join(self.sample_dir, path) if stat.st_size == 0: print(f"Warning, empty file: {abspath!r}", file=sys.stderr) continue topic = None last_sep = path.rfind("/") if last_sep > 0: prefix = path[:last_sep] topic = prefix if prev_prefix != prefix: prev_prefix = prefix self.add_sample(path, stat, abspath, topic, db_sample) def add_sample( self, path: str, stat: os.stat_result, abspath: str, topic: Union[str, None], known_sample: Sample, ): if known_sample is None: new_sample = Sample( path=path, mtime=stat.st_mtime, size=stat.st_size, abspath=abspath, sound=mixer.Sound(abspath), ) new_sample.sound.set_volume(self.volume) self.db.append(new_sample) if topic is not None: if topic not in self.topic_db: self.topic_db[topic] = [new_sample.path] else: self.topic_db[topic].append(new_sample.path) else: known_sample.old_sounds.append((known_sample.sound, time.time())) known_sample.sound = mixer.Sound(abspath) known_sample.sound.set_volume(self.volume) known_sample.mtime = stat.st_mtime known_sample.size = stat.st_size def set_volume(self, volume): self.volume = volume for sample in self.db: sample.sound.set_volume(volume) def next(self): if self.index == 0: shuffle(self.db) ret_sample = self.db[self.index] self.index = (self.index + 1) % len(self.db) return ret_sample def next_sound(self): return self.next().sound def next_by_topic(self, topic: str): if topic not in self.topic_db: return start_index = self.index sample = self.next() while sample.path not in self.topic_db[topic] and self.index != start_index: print(f"trying sample {sample.path!r} for topic {topic!r}") sample = self.next() if self.index == start_index: return return sample def next_sound_by_topic(self, topic: str): next_sample = self.next_by_topic(topic) if next_sample is not None: return next_sample.sound def get_sample(self, path: str) -> Union[dict, None]: for sample in self.db: if sample.path == path: return sample def get_sample_from_channel(self, channel): ret_sample = None c_sound = channel.get_sound() if c_sound is None: return None for sample in self.db: if sample.sound == c_sound: ret_sample = sample if ret_sample is None: for sample in self.db: for o_sample, _ in sample.old_sounds: if o_sample == c_sound: ret_sample = sample for sample in self.db: expired = [] for i, (o_sample, exp_time) in enumerate(sample.old_sounds): if ((time.time() - exp_time) >= o_sample.get_length()) or ( time.time() - exp_time < 0 ): expired.append(i) for i in reversed(expired): sample.old_sounds.pop(i) return ret_sample # if __name__ == "__main__": # mixer.init() # chan = mixer.Channel(0) # coll = SampleCollection("/home/jakob/dev/seba-audio/gincloud/samples/buzzwords") # chan.queue(coll.next_sound_by_topic("agile")) # chan.queue(coll.next_sound_by_topic("cloud")) # print(coll.get_sample_from_channel(chan).path) # time.sleep(7) # print(coll.get_sample_from_channel(chan).path)