162 lines
5.0 KiB
Python
162 lines
5.0 KiB
Python
|
import os
|
||
|
import time
|
||
|
from random import shuffle
|
||
|
from dataclasses import dataclass, field
|
||
|
|
||
|
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "hide"
|
||
|
from pygame import mixer
|
||
|
from typing import Union, Tuple, List
|
||
|
from dirs import list_files_relative
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class Sample:
|
||
|
path: str
|
||
|
abspath: str
|
||
|
mtime: float
|
||
|
size: int
|
||
|
sound: mixer.Sound
|
||
|
old_sounds: List[Tuple[mixer.Sound, float]] = field(default_factory=list)
|
||
|
|
||
|
|
||
|
class SampleCollection:
|
||
|
extensions = ["ogg"]
|
||
|
|
||
|
def __init__(self, sample_dir: str, volume: float = 0.7):
|
||
|
self.db = []
|
||
|
self.volume = volume
|
||
|
self.topic_db = {}
|
||
|
self.sample_dir = sample_dir
|
||
|
self.index = 0
|
||
|
self.load()
|
||
|
|
||
|
def __len__(self):
|
||
|
return len(self.db)
|
||
|
|
||
|
def load(self):
|
||
|
prev_prefix = ""
|
||
|
for path, stat in list_files_relative(
|
||
|
self.sample_dir, extensions=self.extensions
|
||
|
):
|
||
|
db_sample = self.get_sample(path)
|
||
|
if (
|
||
|
db_sample is not None
|
||
|
and db_sample.mtime == int(stat.st_mtime)
|
||
|
and db_sample.size == stat.st_size
|
||
|
):
|
||
|
continue
|
||
|
abspath = os.path.join(self.sample_dir, path)
|
||
|
if stat.st_size == 0:
|
||
|
print(f"Warning, empty file: {abspath!r}", file=sys.stderr)
|
||
|
continue
|
||
|
topic = None
|
||
|
last_sep = path.rfind("/")
|
||
|
if last_sep > 0:
|
||
|
prefix = path[:last_sep]
|
||
|
topic = prefix
|
||
|
if prev_prefix != prefix:
|
||
|
prev_prefix = prefix
|
||
|
self.add_sample(path, stat, abspath, topic, db_sample)
|
||
|
|
||
|
def add_sample(
|
||
|
self,
|
||
|
path: str,
|
||
|
stat: os.stat_result,
|
||
|
abspath: str,
|
||
|
topic: Union[str, None],
|
||
|
known_sample: Sample,
|
||
|
):
|
||
|
if known_sample is None:
|
||
|
new_sample = Sample(
|
||
|
path=path,
|
||
|
mtime=stat.st_mtime,
|
||
|
size=stat.st_size,
|
||
|
abspath=abspath,
|
||
|
sound=mixer.Sound(abspath),
|
||
|
)
|
||
|
new_sample.sound.set_volume(self.volume)
|
||
|
self.db.append(new_sample)
|
||
|
if topic is not None:
|
||
|
if topic not in self.topic_db:
|
||
|
self.topic_db[topic] = [new_sample.path]
|
||
|
else:
|
||
|
self.topic_db[topic].append(new_sample.path)
|
||
|
else:
|
||
|
known_sample.old_sounds.append((known_sample.sound, time.time()))
|
||
|
known_sample.sound = mixer.Sound(abspath)
|
||
|
known_sample.sound.set_volume(self.volume)
|
||
|
known_sample.mtime = stat.st_mtime
|
||
|
known_sample.size = stat.st_size
|
||
|
|
||
|
def set_volume(self, volume):
|
||
|
self.volume = volume
|
||
|
for sample in self.db:
|
||
|
sample.sound.set_volume(volume)
|
||
|
|
||
|
def next(self):
|
||
|
if self.index == 0:
|
||
|
shuffle(self.db)
|
||
|
ret_sample = self.db[self.index]
|
||
|
self.index = (self.index + 1) % len(self.db)
|
||
|
return ret_sample
|
||
|
|
||
|
def next_sound(self):
|
||
|
return self.next().sound
|
||
|
|
||
|
def next_by_topic(self, topic: str):
|
||
|
if topic not in self.topic_db:
|
||
|
return
|
||
|
start_index = self.index
|
||
|
sample = self.next()
|
||
|
while sample.path not in self.topic_db[topic] and self.index != start_index:
|
||
|
print(f"trying sample {sample.path!r} for topic {topic!r}")
|
||
|
sample = self.next()
|
||
|
if self.index == start_index:
|
||
|
return
|
||
|
return sample
|
||
|
|
||
|
def next_sound_by_topic(self, topic: str):
|
||
|
next_sample = self.next_by_topic(topic)
|
||
|
if next_sample is not None:
|
||
|
return next_sample.sound
|
||
|
|
||
|
def get_sample(self, path: str) -> Union[dict, None]:
|
||
|
for sample in self.db:
|
||
|
if sample.path == path:
|
||
|
return sample
|
||
|
|
||
|
def get_sample_from_channel(self, channel):
|
||
|
ret_sample = None
|
||
|
c_sound = channel.get_sound()
|
||
|
if c_sound is None:
|
||
|
return None
|
||
|
for sample in self.db:
|
||
|
if sample.sound == c_sound:
|
||
|
ret_sample = sample
|
||
|
if ret_sample is None:
|
||
|
for sample in self.db:
|
||
|
for o_sample, _ in sample.old_sounds:
|
||
|
if o_sample == c_sound:
|
||
|
ret_sample = sample
|
||
|
for sample in self.db:
|
||
|
expired = []
|
||
|
for i, (o_sample, exp_time) in enumerate(sample.old_sounds):
|
||
|
if ((time.time() - exp_time) >= o_sample.get_length()) or (
|
||
|
time.time() - exp_time < 0
|
||
|
):
|
||
|
expired.append(i)
|
||
|
for i in reversed(expired):
|
||
|
sample.old_sounds.pop(i)
|
||
|
return ret_sample
|
||
|
|
||
|
|
||
|
# if __name__ == "__main__":
|
||
|
# mixer.init()
|
||
|
# chan = mixer.Channel(0)
|
||
|
# coll = SampleCollection("/home/jakob/dev/seba-audio/gincloud/samples/buzzwords")
|
||
|
# chan.queue(coll.next_sound_by_topic("agile"))
|
||
|
# chan.queue(coll.next_sound_by_topic("cloud"))
|
||
|
# print(coll.get_sample_from_channel(chan).path)
|
||
|
# time.sleep(7)
|
||
|
# print(coll.get_sample_from_channel(chan).path)
|