25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

162 satır
5.0 KiB

import os
import time
from random import shuffle
from dataclasses import dataclass, field
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "hide"
from pygame import mixer
from typing import Union, Tuple, List
from dirs import list_files_relative
@dataclass
class Sample:
path: str
abspath: str
mtime: float
size: int
sound: mixer.Sound
old_sounds: List[Tuple[mixer.Sound, float]] = field(default_factory=list)
class SampleCollection:
extensions = ["ogg"]
def __init__(self, sample_dir: str, volume: float = 0.7):
self.db = []
self.volume = volume
self.topic_db = {}
self.sample_dir = sample_dir
self.index = 0
self.load()
def __len__(self):
return len(self.db)
def load(self):
prev_prefix = ""
for path, stat in list_files_relative(
self.sample_dir, extensions=self.extensions
):
db_sample = self.get_sample(path)
if (
db_sample is not None
and db_sample.mtime == int(stat.st_mtime)
and db_sample.size == stat.st_size
):
continue
abspath = os.path.join(self.sample_dir, path)
if stat.st_size == 0:
print(f"Warning, empty file: {abspath!r}", file=sys.stderr)
continue
topic = None
last_sep = path.rfind("/")
if last_sep > 0:
prefix = path[:last_sep]
topic = prefix
if prev_prefix != prefix:
prev_prefix = prefix
self.add_sample(path, stat, abspath, topic, db_sample)
def add_sample(
self,
path: str,
stat: os.stat_result,
abspath: str,
topic: Union[str, None],
known_sample: Sample,
):
if known_sample is None:
new_sample = Sample(
path=path,
mtime=stat.st_mtime,
size=stat.st_size,
abspath=abspath,
sound=mixer.Sound(abspath),
)
new_sample.sound.set_volume(self.volume)
self.db.append(new_sample)
if topic is not None:
if topic not in self.topic_db:
self.topic_db[topic] = [new_sample.path]
else:
self.topic_db[topic].append(new_sample.path)
else:
known_sample.old_sounds.append((known_sample.sound, time.time()))
known_sample.sound = mixer.Sound(abspath)
known_sample.sound.set_volume(self.volume)
known_sample.mtime = stat.st_mtime
known_sample.size = stat.st_size
def set_volume(self, volume):
self.volume = volume
for sample in self.db:
sample.sound.set_volume(volume)
def next(self):
if self.index == 0:
shuffle(self.db)
ret_sample = self.db[self.index]
self.index = (self.index + 1) % len(self.db)
return ret_sample
def next_sound(self):
return self.next().sound
def next_by_topic(self, topic: str):
if topic not in self.topic_db:
return
start_index = self.index
sample = self.next()
while sample.path not in self.topic_db[topic] and self.index != start_index:
print(f"trying sample {sample.path!r} for topic {topic!r}")
sample = self.next()
if self.index == start_index:
return
return sample
def next_sound_by_topic(self, topic: str):
next_sample = self.next_by_topic(topic)
if next_sample is not None:
return next_sample.sound
def get_sample(self, path: str) -> Union[dict, None]:
for sample in self.db:
if sample.path == path:
return sample
def get_sample_from_channel(self, channel):
ret_sample = None
c_sound = channel.get_sound()
if c_sound is None:
return None
for sample in self.db:
if sample.sound == c_sound:
ret_sample = sample
if ret_sample is None:
for sample in self.db:
for o_sample, _ in sample.old_sounds:
if o_sample == c_sound:
ret_sample = sample
for sample in self.db:
expired = []
for i, (o_sample, exp_time) in enumerate(sample.old_sounds):
if ((time.time() - exp_time) >= o_sample.get_length()) or (
time.time() - exp_time < 0
):
expired.append(i)
for i in reversed(expired):
sample.old_sounds.pop(i)
return ret_sample
# if __name__ == "__main__":
# mixer.init()
# chan = mixer.Channel(0)
# coll = SampleCollection("/home/jakob/dev/seba-audio/gincloud/samples/buzzwords")
# chan.queue(coll.next_sound_by_topic("agile"))
# chan.queue(coll.next_sound_by_topic("cloud"))
# print(coll.get_sample_from_channel(chan).path)
# time.sleep(7)
# print(coll.get_sample_from_channel(chan).path)