You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

471 lines
20 KiB

  1. """test_feature_extractors.py - test pipeline feature extractors"""
  2. import sys
  3. from unittest.mock import patch, mock_open, MagicMock #
  4. sys.modules["faster_whisper"] = MagicMock() # mock faster_whisper as it is a slow import
  5. import unittest
  6. import os
  7. import random
  8. import pytest
  9. import pipeline.feature_extractors as extractors
  10. from pipeline.utils import Source, SourceMedia # technically makes this an integration test, but...
  11. from test.mocks import MockFeature, MockInterval
  12. class TestSource():
  13. """Provide utils.Source for testing"""
  14. def one_colour_silent_audio(self):
  15. """Provide a source with a silent mono-colour video"""
  16. TEST_DIR = os.path.dirname(os.path.realpath(__file__))
  17. SAMPLE_VIDEO = f"{TEST_DIR}/sample_videos/test_video_red_silentaudio.mp4" # silent video definitely has no laughter
  18. return Source(source=SAMPLE_VIDEO, path=SAMPLE_VIDEO, provider="test")
  19. class TestSourceMedia():
  20. """Provide utils.SourceMedia for testing"""
  21. def one_colour_silent_audio(self):
  22. """Provide a source with a silent mono-colour video"""
  23. return SourceMedia(sources=[TestSource().one_colour_silent_audio()])
  24. class MockReadJSON():
  25. """Mock read_json"""
  26. def mock_read_json_from_file(self, *args, **kwargs):
  27. """Mock _read_json_from_file()"""
  28. rJSON = [{"interval": {"start": 0.0, "duration": 1.0},
  29. "source": {"source": "test_video_red_silentaudio.mp4",
  30. "path": "test_video_red_silentaudio.mp4",
  31. "provider": "mock"},
  32. "feature_extractor": "MockFeatureExtractor",
  33. "score": 0.5
  34. }]
  35. return rJSON
  36. class TestLaughterFeatureExtractor(unittest.TestCase):
  37. def _mock_laughdetect_callout(self, *args, **kwargs):
  38. """Mock _laughdetect callout
  39. **kwargs:
  40. - n : int >=0, number of laughter instances to generate
  41. Return a list of 2-tuple floats (start, end) representing laughter instances
  42. """
  43. laughs = []
  44. n = kwargs.get("n", 0)
  45. for i in range(n):
  46. laughs.append((i, i+1))
  47. return laughs
  48. def _mock_run_get_output(self, *args, **kwargs) -> str:
  49. """Mock run_get_output callout
  50. kwargs:
  51. - n : int >=0, number of laughter instances to generate
  52. Return a string of laughter instance of the form:
  53. instance: (1.234, 5.678)
  54. """
  55. # TODO: decide if we want non-"instance" output for testing parsing?
  56. # (maybe)
  57. output = []
  58. n = kwargs.get("n", 0)
  59. for i in range(n):
  60. output.append(f"instance: ({i}.{i+1}{i+2}{i+3}, {i+4}.{i+5}{i+6}{i+7})")
  61. return "\n".join(output)
  62. def _sgo5(self, *args, **kwargs):
  63. """Mock run_get_output callout"""
  64. return self._mock_run_get_output(*args, **kwargs, n=5)
  65. """Test LaughterFeatureExtractor"""
  66. def test_init(self):
  67. test_extractor = extractors.LaughterFeatureExtractor()
  68. self.assertTrue(test_extractor)
  69. def test_setup_noinput(self):
  70. """test setup - no input files"""
  71. test_extractor = extractors.LaughterFeatureExtractor()
  72. with self.assertRaises(ValueError):
  73. test_extractor.setup()
  74. # NB test WITH sources implicitly tested in test_extract
  75. @pytest.mark.slow
  76. def test_extract_mocked_nolaughs(self):
  77. """Test extract with mocked laughter detection - no laughs"""
  78. video_source = TestSource().one_colour_silent_audio()
  79. test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source])
  80. test_extractor._laughdetect = self._mock_laughdetect_callout
  81. test_extractor.setup()
  82. test_extractor.run()
  83. test_extractor.teardown()
  84. self.assertEqual(len(test_extractor.features), 0)
  85. def test_extract_mocked_run_get_output_none(self):
  86. """Test extract with mocked laughter detection - no laughs"""
  87. video_source = TestSource().one_colour_silent_audio()
  88. test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source])
  89. test_extractor._run_get_output = self._mock_run_get_output
  90. test_extractor.setup()
  91. test_extractor.run()
  92. test_extractor.teardown()
  93. self.assertEqual(len(test_extractor.features), 0)
  94. def test_extract_mocked_run_get_output_5(self):
  95. """Test extract with mocked laughter detection - 5 laughs"""
  96. video_source = TestSource().one_colour_silent_audio()
  97. test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source])
  98. test_extractor._run_get_output = self._sgo5
  99. test_extractor.setup()
  100. test_extractor.run()
  101. test_extractor.teardown()
  102. self.assertEqual(len(test_extractor.features), 5)
  103. def test_run_get_output(self):
  104. """Test run_get_output"""
  105. video_source = TestSource().one_colour_silent_audio()
  106. test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source])
  107. test_cmd = ["echo", "foo"]
  108. test_extractor.setup()
  109. output = test_extractor._run_get_output(test_cmd)
  110. self.assertEqual(output, "foo\n")
  111. # TODO: add sample video with laughs to test _laughdetect()
  112. class TestRandomFeatureExtractor(unittest.TestCase):
  113. """Test RandomFeatureExtractor"""
  114. def test_init(self):
  115. test_extractor = extractors.RandomFeatureExtractor()
  116. self.assertTrue(test_extractor)
  117. def test_setup_noinput(self):
  118. """test setup - no input files"""
  119. test_extractor = extractors.RandomFeatureExtractor()
  120. with self.assertRaises(ValueError):
  121. test_extractor.setup()
  122. # NB test WITH sources implicitly tested in test_extract
  123. def test_extract_noinput(self):
  124. """Test extract with no input files"""
  125. test_extractor = extractors.RandomFeatureExtractor()
  126. with self.assertRaises(ValueError):
  127. test_extractor.run()
  128. def test_extract(self):
  129. """Test extract with input files"""
  130. video_source = TestSourceMedia().one_colour_silent_audio()
  131. test_extractor = extractors.RandomFeatureExtractor(input_files=video_source)
  132. test_extractor.setup()
  133. test_extractor.run()
  134. test_extractor.teardown()
  135. self.assertTrue(test_extractor.features)
  136. class TestLoudAudioFeatureExtractor(unittest.TestCase):
  137. """Test LoudAudioFeatureExtractor"""
  138. def _mock_loudnorm_5(self, *args, **kwargs):
  139. """Mock _loudnorm
  140. It returns a list of 2-tuple floats (time, loudness) representing loud audio instances
  141. """
  142. return [(0.0, 0.0), (15.0, 1.0), (25.0, 2.0), (35.0, 3.0), (45.0, 4.0)]
  143. def _mock_get_loudnessess(self, *args, length=100, min_loudness=-101, max_loudness=100,
  144. seed=42, **kwargs) -> list:
  145. """Mock _get_loudnesses()
  146. Parameters:
  147. - length : int >=0, number of loudness instances to generate
  148. - min_loudness : int, minimum loudness value (special value: -101 for "-inf")
  149. - max_loudness : int, maximum loudness value
  150. Note that int min/max loudness are divided by float 100
  151. to get the actual loudness value between -1.0 and 1.0
  152. Return a list of 2-tuple floats (timecode, loudness) representing loud audio instances
  153. """
  154. loudnesses = []
  155. random.seed(seed)
  156. for i in range(length):
  157. loudness = random.randint(min_loudness, max_loudness) / 100
  158. if min_loudness == -101:
  159. loudness = "-inf" if loudness == -1.01 else f"{loudness}"
  160. loudnesses.append((float(f"{i*20}.0"), float(loudness)))
  161. return loudnesses
  162. def test_init(self):
  163. video_source = TestSourceMedia().one_colour_silent_audio()
  164. test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source)
  165. self.assertTrue(test_extractor)
  166. def test_init_noinput(self):
  167. """test init - no input files"""
  168. with self.assertRaises(ValueError):
  169. test_extractor = extractors.LoudAudioFeatureExtractor()
  170. def test_extract(self):
  171. """Test extract with input files"""
  172. video_source = TestSourceMedia().one_colour_silent_audio()
  173. test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source)
  174. test_extractor.setup()
  175. test_extractor.run()
  176. test_extractor.teardown()
  177. self.assertEqual(test_extractor.features, [])
  178. def test_extract_mocked_loudnorm_5(self):
  179. """Test extract with mocked loudness detection"""
  180. video_source = TestSourceMedia().one_colour_silent_audio()
  181. test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source)
  182. test_extractor._loudnorm = self._mock_loudnorm_5
  183. test_extractor.setup()
  184. test_extractor.run()
  185. test_extractor.teardown()
  186. self.assertEqual(len(test_extractor.features), 5)
  187. def test_extract_mocked_get_loudnesses(self):
  188. """Test extract with mocked loudness detection - 100 loudnesses generated"""
  189. video_source = TestSourceMedia().one_colour_silent_audio()
  190. test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source, num_features=100)
  191. test_extractor._get_loudnesses = self._mock_get_loudnessess
  192. test_extractor.setup()
  193. test_extractor.run()
  194. test_extractor.teardown()
  195. self.assertEqual(len(test_extractor.features), 100)
  196. def test_keep_num(self):
  197. """Test keep_num correctly keeps 5 / 10"""
  198. min_duration = 0
  199. video_source = TestSourceMedia().one_colour_silent_audio()
  200. with self.subTest("keep 5 (default)"):
  201. test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source,
  202. min_duration=min_duration,
  203. num_features=5)
  204. test_extractor._get_loudnesses = self._mock_get_loudnessess
  205. test_extractor.setup()
  206. test_extractor.run()
  207. test_extractor.teardown()
  208. self.assertEqual(len(test_extractor.features), 5)
  209. with self.subTest("keep 10"):
  210. test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source,
  211. min_duration=min_duration,
  212. num_features=10)
  213. test_extractor._get_loudnesses = self._mock_get_loudnessess
  214. test_extractor.setup()
  215. test_extractor.run()
  216. test_extractor.teardown()
  217. self.assertEqual(len(test_extractor.features), 10)
  218. # test with min_duration
  219. min_duration = 100
  220. with self.subTest("min_duration"):
  221. test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source,
  222. min_duration=min_duration,
  223. num_features=10)
  224. test_extractor._get_loudnesses = self._mock_get_loudnessess
  225. test_extractor.setup()
  226. test_extractor.run()
  227. test_extractor.teardown()
  228. for feature in test_extractor.features:
  229. self.assertGreaterEqual(feature.interval.duration, min_duration)
  230. # test trim_overlap
  231. with self.subTest("trim_overlap"):
  232. test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source,
  233. min_duration=0,
  234. num_features=10)
  235. feature1 = MockFeature(interval=MockInterval(start=25, end=75))
  236. # mock feature1's interval .overlaps() method to always return True
  237. feature1.interval.overlaps = lambda x: True
  238. feature2 = MockFeature(interval=MockInterval(start=50, end=100)) # overlap
  239. feature2.interval.overlaps = lambda x: True
  240. features = [feature1, feature2]
  241. keep_features = test_extractor._keep_num(features,num=5, trim_overlap=True)
  242. self.assertEqual(len(keep_features), 1)
  243. # TODO: add sample video with loud audio to test _loudnessdetect()
  244. class TestVideoActivityFeatureExtractor(unittest.TestCase):
  245. """Test VideoActivityFeatureExtractor"""
  246. def test_init(self):
  247. video_source = TestSourceMedia().one_colour_silent_audio()
  248. test_extractor = extractors.VideoActivityFeatureExtractor(input_files=video_source)
  249. self.assertTrue(test_extractor)
  250. def test_init_noinput(self):
  251. """test init - no input files"""
  252. with self.assertRaises(ValueError):
  253. test_extractor = extractors.VideoActivityFeatureExtractor()
  254. def test_extract(self):
  255. """Test extract with basic input file runs with no errors"""
  256. num_features = 50
  257. min_duration = 0
  258. video_source = TestSourceMedia().one_colour_silent_audio()
  259. test_extractor = extractors.VideoActivityFeatureExtractor(input_files=video_source,
  260. num_features=num_features,
  261. min_duration=min_duration)
  262. test_extractor.setup()
  263. test_extractor.run()
  264. test_extractor.teardown()
  265. self.assertTrue(test_extractor.features)
  266. def test_keep_num(self):
  267. """Test keep_num keeps 0"""
  268. num_features = 1
  269. min_duration = 0
  270. video_source = TestSourceMedia().one_colour_silent_audio()
  271. test_extractor = extractors.VideoActivityFeatureExtractor(input_files=video_source,
  272. num_features=num_features,
  273. min_duration=min_duration)
  274. test_extractor.setup()
  275. test_extractor.run()
  276. test_extractor.teardown()
  277. self.assertEqual(len(test_extractor.features), num_features)
  278. class TestJSONFeatureExtractor(unittest.TestCase):
  279. """Test JSONFeatureExtractor"""
  280. def test_init(self):
  281. video_source = TestSourceMedia().one_colour_silent_audio()
  282. test_extractor = extractors.JSONFeatureExtractor(input_files=video_source)
  283. self.assertTrue(test_extractor)
  284. def test_init_noinput(self):
  285. """test init - no input files"""
  286. with self.assertRaises(ValueError):
  287. test_extractor = extractors.JSONFeatureExtractor()
  288. def test_extract(self):
  289. """Test extract with basic input file runs with no errors"""
  290. video_source = TestSourceMedia().one_colour_silent_audio()
  291. test_extractor = extractors.JSONFeatureExtractor(input_files=video_source)
  292. # mock _read_json_from_file
  293. test_extractor._read_json_from_file = MockReadJSON().mock_read_json_from_file
  294. test_extractor.setup()
  295. test_extractor.run()
  296. test_extractor.teardown()
  297. self.assertTrue(test_extractor.features)
  298. def test_read_json_from_file(self):
  299. """Test _read_json_from_file"""
  300. video_source = TestSourceMedia().one_colour_silent_audio()
  301. test_extractor = extractors.JSONFeatureExtractor(input_files=video_source)
  302. m = unittest.mock.mock_open(read_data='[{"foo": "bar"}]')
  303. with unittest.mock.patch("builtins.open", m):
  304. test_extractor._read_json_from_file("foo.json")
  305. class TestWordFeatureExtractor(unittest.TestCase):
  306. """Test WordFeatureExtractor"""
  307. @classmethod
  308. def setUpClass(cls):
  309. sys.modules["faster_whisper"] = MagicMock()
  310. _MOCK_SENTENCE = "the quick brown fox jumps over the lazy dog".split()
  311. class MockSegment():
  312. """Mock Segment -- has starte, end and text attributes"""
  313. def __init__(self, start, end, text):
  314. self.start = start
  315. self.end = end
  316. self.text = text
  317. def mock_transcribe(self, *args, **kwargs):
  318. """Mock for WhisperModel.model.transcribe
  319. returns a 2-tuple:
  320. - list of segments
  321. + segment = start, end, text
  322. - info = language, language_probability
  323. We will mock the segments- this provides 9 segments for the sentence:
  324. "the quick brown fox jumps over the lazy dog"
  325. """
  326. segments = []
  327. for i in range(len(self._MOCK_SENTENCE)):
  328. segments.append(self.MockSegment(i, i+1, self._MOCK_SENTENCE[i]))
  329. return segments, {"language": "en", "language_probability": 0.9}
  330. def test_basic_init(self):
  331. video_source = TestSourceMedia().one_colour_silent_audio()
  332. test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
  333. self.assertTrue(test_extractor)
  334. def test_init_no_input_videos(self):
  335. """test init - no input files"""
  336. with self.assertRaises(ValueError):
  337. test_extractor = extractors.WordFeatureExtractor()
  338. def test_extract_no_words_supplied(self):
  339. """Test extract with basic input file but no words specirfied returns zero features"""
  340. video_source = TestSourceMedia().one_colour_silent_audio()
  341. test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
  342. test_extractor.setup()
  343. test_extractor.run()
  344. test_extractor.teardown()
  345. self.assertEqual(test_extractor.features, [])
  346. def test_extract_mocked_transcribe_matching_words(self):
  347. """Mock out the actual call to transcribe but match all words in the sentence"""
  348. video_source = TestSourceMedia().one_colour_silent_audio()
  349. test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
  350. # mock _transcribe and mock out model and batched pipeline for speed
  351. test_extractor._transcribe = self.mock_transcribe
  352. test_extractor._model = MagicMock()
  353. test_extractor._batched_model = MagicMock()
  354. # set up and run the extractor
  355. test_extractor.setup(words=self._MOCK_SENTENCE)
  356. with self.subTest("batched"):
  357. test_extractor.run()
  358. test_extractor.teardown()
  359. self.assertEqual(len(test_extractor.features), 9)
  360. test_extractor.features = [] # reset features
  361. with self.subTest("non-batched"):
  362. test_extractor.DEFAULT_PIPELINE_TYPE = "non-batched"
  363. test_extractor.run()
  364. test_extractor.teardown()
  365. self.assertEqual(len(test_extractor.features), 9)
  366. def test_extract_mocked_transcribe_no_matching_words(self):
  367. """Mock out the actual call to transcribe but match no words in the sentence"""
  368. video_source = TestSourceMedia().one_colour_silent_audio()
  369. test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
  370. # mock _transcribe and mock out model and batched pipeline for speed
  371. test_extractor._transcribe = self.mock_transcribe
  372. test_extractor._model = MagicMock()
  373. test_extractor._batched_model = MagicMock()
  374. # set up and run the extractor
  375. test_extractor.setup(words=["nonexistentword"])
  376. test_extractor.run()
  377. test_extractor.teardown()
  378. self.assertEqual(len(test_extractor.features), 0)
  379. def test_extract_mocked_transcribe_some_matching_words(self):
  380. """Mock out the actual call to transcribe but match some words in the sentence"""
  381. video_source = TestSourceMedia().one_colour_silent_audio()
  382. test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
  383. # mock _transcribe and mock out model and batched pipeline for speed
  384. test_extractor._transcribe = self.mock_transcribe
  385. test_extractor._model = MagicMock()
  386. test_extractor._batched_model = MagicMock()
  387. # set up and run the extractor
  388. test_extractor.setup(words=["quick", "jumps", "dog"])
  389. test_extractor.run()
  390. test_extractor.teardown()
  391. self.assertEqual(len(test_extractor.features), 3)
  392. def test_transcribe(self):
  393. """Test _transcribe -- it calls model.transcribe"""
  394. video_source = TestSourceMedia().one_colour_silent_audio()
  395. test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
  396. mock_model = MagicMock()
  397. test_extractor._transcribe(mock_model, "test.mp4")
  398. mock_model.transcribe.assert_called_once()