Tu's in die Cloud, höhöhö....
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

collection.py 5.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import os
  2. import time
  3. from random import shuffle
  4. from dataclasses import dataclass, field
  5. os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "hide"
  6. from pygame import mixer
  7. from typing import Union, Tuple, List
  8. from dirs import list_files_relative
  9. @dataclass
  10. class Sample:
  11. path: str
  12. abspath: str
  13. mtime: float
  14. size: int
  15. sound: mixer.Sound
  16. old_sounds: List[Tuple[mixer.Sound, float]] = field(default_factory=list)
  17. class SampleCollection:
  18. extensions = ["ogg"]
  19. def __init__(self, sample_dir: str, volume: float = 0.7):
  20. self.db = []
  21. self.volume = volume
  22. self.topic_db = {}
  23. self.sample_dir = sample_dir
  24. self.index = 0
  25. self.load()
  26. def __len__(self):
  27. return len(self.db)
  28. def load(self):
  29. prev_prefix = ""
  30. for path, stat in list_files_relative(
  31. self.sample_dir, extensions=self.extensions
  32. ):
  33. db_sample = self.get_sample(path)
  34. if (
  35. db_sample is not None
  36. and db_sample.mtime == int(stat.st_mtime)
  37. and db_sample.size == stat.st_size
  38. ):
  39. continue
  40. abspath = os.path.join(self.sample_dir, path)
  41. if stat.st_size == 0:
  42. print(f"Warning, empty file: {abspath!r}", file=sys.stderr)
  43. continue
  44. topic = None
  45. last_sep = path.rfind("/")
  46. if last_sep > 0:
  47. prefix = path[:last_sep]
  48. topic = prefix
  49. if prev_prefix != prefix:
  50. prev_prefix = prefix
  51. self.add_sample(path, stat, abspath, topic, db_sample)
  52. def add_sample(
  53. self,
  54. path: str,
  55. stat: os.stat_result,
  56. abspath: str,
  57. topic: Union[str, None],
  58. known_sample: Sample,
  59. ):
  60. if known_sample is None:
  61. new_sample = Sample(
  62. path=path,
  63. mtime=stat.st_mtime,
  64. size=stat.st_size,
  65. abspath=abspath,
  66. sound=mixer.Sound(abspath),
  67. )
  68. new_sample.sound.set_volume(self.volume)
  69. self.db.append(new_sample)
  70. if topic is not None:
  71. if topic not in self.topic_db:
  72. self.topic_db[topic] = [new_sample.path]
  73. else:
  74. self.topic_db[topic].append(new_sample.path)
  75. else:
  76. known_sample.old_sounds.append((known_sample.sound, time.time()))
  77. known_sample.sound = mixer.Sound(abspath)
  78. known_sample.sound.set_volume(self.volume)
  79. known_sample.mtime = stat.st_mtime
  80. known_sample.size = stat.st_size
  81. def set_volume(self, volume):
  82. self.volume = volume
  83. for sample in self.db:
  84. sample.sound.set_volume(volume)
  85. def next(self):
  86. if self.index == 0:
  87. shuffle(self.db)
  88. ret_sample = self.db[self.index]
  89. self.index = (self.index + 1) % len(self.db)
  90. return ret_sample
  91. def next_sound(self):
  92. return self.next().sound
  93. def next_by_topic(self, topic: str):
  94. if topic not in self.topic_db:
  95. return
  96. start_index = self.index
  97. sample = self.next()
  98. while sample.path not in self.topic_db[topic] and self.index != start_index:
  99. print(f"trying sample {sample.path!r} for topic {topic!r}")
  100. sample = self.next()
  101. if self.index == start_index:
  102. return
  103. return sample
  104. def next_sound_by_topic(self, topic: str):
  105. next_sample = self.next_by_topic(topic)
  106. if next_sample is not None:
  107. return next_sample.sound
  108. def get_sample(self, path: str) -> Union[dict, None]:
  109. for sample in self.db:
  110. if sample.path == path:
  111. return sample
  112. def get_sample_from_channel(self, channel):
  113. ret_sample = None
  114. c_sound = channel.get_sound()
  115. if c_sound is None:
  116. return None
  117. for sample in self.db:
  118. if sample.sound == c_sound:
  119. ret_sample = sample
  120. if ret_sample is None:
  121. for sample in self.db:
  122. for o_sample, _ in sample.old_sounds:
  123. if o_sample == c_sound:
  124. ret_sample = sample
  125. for sample in self.db:
  126. expired = []
  127. for i, (o_sample, exp_time) in enumerate(sample.old_sounds):
  128. if ((time.time() - exp_time) >= o_sample.get_length()) or (
  129. time.time() - exp_time < 0
  130. ):
  131. expired.append(i)
  132. for i in reversed(expired):
  133. sample.old_sounds.pop(i)
  134. return ret_sample
  135. # if __name__ == "__main__":
  136. # mixer.init()
  137. # chan = mixer.Channel(0)
  138. # coll = SampleCollection("/home/jakob/dev/seba-audio/gincloud/samples/buzzwords")
  139. # chan.queue(coll.next_sound_by_topic("agile"))
  140. # chan.queue(coll.next_sound_by_topic("cloud"))
  141. # print(coll.get_sample_from_channel(chan).path)
  142. # time.sleep(7)
  143. # print(coll.get_sample_from_channel(chan).path)