|
- """test_feature_extractors.py - test pipeline feature extractors"""
- import sys
- from unittest.mock import patch, mock_open, MagicMock #
- sys.modules["faster_whisper"] = MagicMock() # mock faster_whisper as it is a slow import
-
- import unittest
- import os
- import random
- import pytest
- import pipeline.feature_extractors as extractors
-
- from pipeline.utils import Source, SourceMedia # technically makes this an integration test, but...
-
- class TestSource():
- """Provide utils.Source for testing"""
- def one_colour_silent_audio(self):
- """Provide a source with a silent mono-colour video"""
- TEST_DIR = os.path.dirname(os.path.realpath(__file__))
- SAMPLE_VIDEO = f"{TEST_DIR}/sample_videos/test_video_red_silentaudio.mp4" # silent video definitely has no laughter
- return Source(source=SAMPLE_VIDEO, path=SAMPLE_VIDEO, provider="test")
-
- class TestSourceMedia():
- """Provide utils.SourceMedia for testing"""
- def one_colour_silent_audio(self):
- """Provide a source with a silent mono-colour video"""
- return SourceMedia(sources=[TestSource().one_colour_silent_audio()])
-
- class MockReadJSON():
- """Mock read_json"""
- def mock_read_json_from_file(self, *args, **kwargs):
- """Mock _read_json_from_file()"""
- rJSON = [{"interval": {"start": 0.0, "duration": 1.0},
- "source": {"source": "test_video_red_silentaudio.mp4",
- "path": "test_video_red_silentaudio.mp4",
- "provider": "mock"},
- "feature_extractor": "MockFeatureExtractor",
- "score": 0.5
- }]
- return rJSON
-
- class TestLaughterFeatureExtractor(unittest.TestCase):
- def _mock_laughdetect_callout(self, *args, **kwargs):
- """Mock _laughdetect callout
-
- **kwargs:
- - n : int >=0, number of laughter instances to generate
- Return a list of 2-tuple floats (start, end) representing laughter instances
- """
- laughs = []
- n = kwargs.get("n", 0)
- for i in range(n):
- laughs.append((i, i+1))
-
- return laughs
-
- def _mock_run_get_output(self, *args, **kwargs) -> str:
- """Mock run_get_output callout
-
- kwargs:
- - n : int >=0, number of laughter instances to generate
-
- Return a string of laughter instance of the form:
- instance: (1.234, 5.678)
- """
- # TODO: decide if we want non-"instance" output for testing parsing?
- # (maybe)
- output = []
- n = kwargs.get("n", 0)
- for i in range(n):
- output.append(f"instance: ({i}.{i+1}{i+2}{i+3}, {i+4}.{i+5}{i+6}{i+7})")
- return "\n".join(output)
-
- def _sgo5(self, *args, **kwargs):
- """Mock run_get_output callout"""
- return self._mock_run_get_output(*args, **kwargs, n=5)
-
-
- """Test LaughterFeatureExtractor"""
- def test_init(self):
- test_extractor = extractors.LaughterFeatureExtractor()
- self.assertTrue(test_extractor)
-
- def test_setup_noinput(self):
- """test setup - no input files"""
- test_extractor = extractors.LaughterFeatureExtractor()
- with self.assertRaises(ValueError):
- test_extractor.setup()
- # NB test WITH sources implicitly tested in test_extract
-
- @pytest.mark.slow
- def test_extract_mocked_nolaughs(self):
- """Test extract with mocked laughter detection - no laughs"""
- video_source = TestSource().one_colour_silent_audio()
- test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source])
- test_extractor._laughdetect = self._mock_laughdetect_callout
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertEqual(len(test_extractor.features), 0)
-
- def test_extract_mocked_run_get_output_none(self):
- """Test extract with mocked laughter detection - no laughs"""
- video_source = TestSource().one_colour_silent_audio()
- test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source])
- test_extractor._run_get_output = self._mock_run_get_output
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertEqual(len(test_extractor.features), 0)
-
- def test_extract_mocked_run_get_output_5(self):
- """Test extract with mocked laughter detection - 5 laughs"""
- video_source = TestSource().one_colour_silent_audio()
- test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source])
- test_extractor._run_get_output = self._sgo5
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertEqual(len(test_extractor.features), 5)
-
- def test_run_get_output(self):
- """Test run_get_output"""
- video_source = TestSource().one_colour_silent_audio()
- test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source])
- test_cmd = ["echo", "foo"]
- test_extractor.setup()
- output = test_extractor._run_get_output(test_cmd)
- self.assertEqual(output, "foo\n")
-
- # TODO: add sample video with laughs to test _laughdetect()
-
- class TestRandomFeatureExtractor(unittest.TestCase):
- """Test RandomFeatureExtractor"""
- def test_init(self):
- test_extractor = extractors.RandomFeatureExtractor()
- self.assertTrue(test_extractor)
-
- def test_setup_noinput(self):
- """test setup - no input files"""
- test_extractor = extractors.RandomFeatureExtractor()
- with self.assertRaises(ValueError):
- test_extractor.setup()
- # NB test WITH sources implicitly tested in test_extract
-
- def test_extract_noinput(self):
- """Test extract with no input files"""
- test_extractor = extractors.RandomFeatureExtractor()
- with self.assertRaises(ValueError):
- test_extractor.run()
-
- def test_extract(self):
- """Test extract with input files"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.RandomFeatureExtractor(input_files=video_source)
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertTrue(test_extractor.features)
-
- class TestLoudAudioFeatureExtractor(unittest.TestCase):
- """Test LoudAudioFeatureExtractor"""
- def _mock_loudnorm(self, *args, **kwargs):
- """Mock _loudnorm
-
- It returns a list of 2-tuple floats (time, loudness) representing loud audio instances
- """
- return [(0.0, 0.0), (1.0, 1.0), (2.0, 2.0), (3.0, 3.0), (4.0, 4.0)]
-
- def _mock_get_loudnessess(self, *args, length=100, min_loudness=-101, max_loudness=100,
- seed=42, **kwargs) -> list:
- """Mock _get_loudnesses()
-
- Parameters:
- - length : int >=0, number of loudness instances to generate
- - min_loudness : int, minimum loudness value (special value: -101 for "-inf")
- - max_loudness : int, maximum loudness value
-
- Note that int min/max loudness are divided by float 100
- to get the actual loudness value between -1.0 and 1.0
-
- Return a list of 2-tuple floats (timecode, loudness) representing loud audio instances
- """
- loudnesses = []
- random.seed(seed)
- for i in range(length):
- loudness = random.randint(min_loudness, max_loudness) / 100
- if min_loudness == -101:
- loudness = "-inf" if loudness == -1.01 else f"{loudness}"
- loudnesses.append((float(f"{i}.0"), float(loudness)))
-
- return loudnesses
-
- def test_init(self):
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source)
- self.assertTrue(test_extractor)
-
- def test_init_noinput(self):
- """test init - no input files"""
- with self.assertRaises(ValueError):
- test_extractor = extractors.LoudAudioFeatureExtractor()
-
- def test_extract(self):
- """Test extract with input files"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source)
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertEqual(test_extractor.features, [])
-
- def test_extract_mocked_loudnorm(self):
- """Test extract with mocked loudness detection"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source)
- test_extractor._loudnorm = self._mock_loudnorm
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertEqual(len(test_extractor.features), 5)
-
- def test_extract_mocked_get_loudnesses(self):
- """Test extract with mocked loudness detection"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source)
- test_extractor._get_loudnesses = self._mock_get_loudnessess
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertEqual(len(test_extractor.features), 100)
-
- # TODO: add sample video with loud audio to test _loudnessdetect()
-
- class TestVideoActivityFeatureExtractor(unittest.TestCase):
- """Test VideoActivityFeatureExtractor"""
-
- def test_init(self):
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.VideoActivityFeatureExtractor(input_files=video_source)
- self.assertTrue(test_extractor)
-
- def test_init_noinput(self):
- """test init - no input files"""
- with self.assertRaises(ValueError):
- test_extractor = extractors.VideoActivityFeatureExtractor()
-
- def test_extract(self):
- """Test extract with basic input file runs with no errors"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.VideoActivityFeatureExtractor(input_files=video_source)
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertTrue(test_extractor.features)
-
- # TODO: add sample video with activity to test _activitydetect()
-
- class TestJSONFeatureExtractor(unittest.TestCase):
- """Test JSONFeatureExtractor"""
- def test_init(self):
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.JSONFeatureExtractor(input_files=video_source)
- self.assertTrue(test_extractor)
-
- def test_init_noinput(self):
- """test init - no input files"""
- with self.assertRaises(ValueError):
- test_extractor = extractors.JSONFeatureExtractor()
-
- def test_extract(self):
- """Test extract with basic input file runs with no errors"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.JSONFeatureExtractor(input_files=video_source)
- # mock _read_json_from_file
- test_extractor._read_json_from_file = MockReadJSON().mock_read_json_from_file
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertTrue(test_extractor.features)
-
- def test_read_json_from_file(self):
- """Test _read_json_from_file"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.JSONFeatureExtractor(input_files=video_source)
- m = unittest.mock.mock_open(read_data='[{"foo": "bar"}]')
- with unittest.mock.patch("builtins.open", m):
- test_extractor._read_json_from_file("foo.json")
-
-
- class TestWordFeatureExtractor(unittest.TestCase):
- """Test WordFeatureExtractor"""
-
- @classmethod
- def setUpClass(cls):
- sys.modules["faster_whisper"] = MagicMock()
-
- _MOCK_SENTENCE = "the quick brown fox jumps over the lazy dog".split()
- class MockSegment():
- """Mock Segment -- has starte, end and text attributes"""
- def __init__(self, start, end, text):
- self.start = start
- self.end = end
- self.text = text
-
- def mock_transcribe(self, *args, **kwargs):
- """Mock for WhisperModel.model.transcribe
-
- returns a 2-tuple:
- - list of segments
- + segment = start, end, text
- - info = language, language_probability
-
- We will mock the segments- this provides 9 segments for the sentence:
- "the quick brown fox jumps over the lazy dog"
- """
- segments = []
- for i in range(len(self._MOCK_SENTENCE)):
- segments.append(self.MockSegment(i, i+1, self._MOCK_SENTENCE[i]))
- return segments, {"language": "en", "language_probability": 0.9}
-
- def test_basic_init(self):
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
- self.assertTrue(test_extractor)
-
- def test_init_no_input_videos(self):
- """test init - no input files"""
- with self.assertRaises(ValueError):
- test_extractor = extractors.WordFeatureExtractor()
-
- def test_extract_no_words_supplied(self):
- """Test extract with basic input file but no words specirfied returns zero features"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
- test_extractor.setup()
- test_extractor.run()
- test_extractor.teardown()
- self.assertEqual(test_extractor.features, [])
-
- def test_extract_mocked_transcribe_matching_words(self):
- """Mock out the actual call to transcribe but match all words in the sentence"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
- # mock _transcribe and mock out model and batched pipeline for speed
- test_extractor._transcribe = self.mock_transcribe
- test_extractor._model = MagicMock()
- test_extractor._batched_model = MagicMock()
- # set up and run the extractor
- test_extractor.setup(words=self._MOCK_SENTENCE)
- test_extractor.run()
- test_extractor.teardown()
-
- self.assertEqual(len(test_extractor.features), 9)
-
- def test_extract_mocked_transcribe_no_matching_words(self):
- """Mock out the actual call to transcribe but match no words in the sentence"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
- # mock _transcribe and mock out model and batched pipeline for speed
- test_extractor._transcribe = self.mock_transcribe
- test_extractor._model = MagicMock()
- test_extractor._batched_model = MagicMock()
- # set up and run the extractor
- test_extractor.setup(words=["nonexistentword"])
- test_extractor.run()
- test_extractor.teardown()
-
- self.assertEqual(len(test_extractor.features), 0)
-
-
- def test_extract_mocked_transcribe_some_matching_words(self):
- """Mock out the actual call to transcribe but match some words in the sentence"""
- video_source = TestSourceMedia().one_colour_silent_audio()
- test_extractor = extractors.WordFeatureExtractor(input_files=video_source)
- # mock _transcribe and mock out model and batched pipeline for speed
- test_extractor._transcribe = self.mock_transcribe
- test_extractor._model = MagicMock()
- test_extractor._batched_model = MagicMock()
- # set up and run the extractor
- test_extractor.setup(words=["quick", "jumps", "dog"])
- test_extractor.run()
- test_extractor.teardown()
-
- self.assertEqual(len(test_extractor.features), 3)
|