"""test_feature_extractors.py - test pipeline feature extractors""" import sys from unittest.mock import patch, mock_open, MagicMock # sys.modules["faster_whisper"] = MagicMock() # mock faster_whisper as it is a slow import import unittest import os import random import pytest import pipeline.feature_extractors as extractors from pipeline.utils import Source, SourceMedia # technically makes this an integration test, but... class TestSource(): """Provide utils.Source for testing""" def one_colour_silent_audio(self): """Provide a source with a silent mono-colour video""" TEST_DIR = os.path.dirname(os.path.realpath(__file__)) SAMPLE_VIDEO = f"{TEST_DIR}/sample_videos/test_video_red_silentaudio.mp4" # silent video definitely has no laughter return Source(source=SAMPLE_VIDEO, path=SAMPLE_VIDEO, provider="test") class TestSourceMedia(): """Provide utils.SourceMedia for testing""" def one_colour_silent_audio(self): """Provide a source with a silent mono-colour video""" return SourceMedia(sources=[TestSource().one_colour_silent_audio()]) class MockReadJSON(): """Mock read_json""" def mock_read_json_from_file(self, *args, **kwargs): """Mock _read_json_from_file()""" rJSON = [{"interval": {"start": 0.0, "duration": 1.0}, "source": {"source": "test_video_red_silentaudio.mp4", "path": "test_video_red_silentaudio.mp4", "provider": "mock"}, "feature_extractor": "MockFeatureExtractor", "score": 0.5 }] return rJSON class TestLaughterFeatureExtractor(unittest.TestCase): def _mock_laughdetect_callout(self, *args, **kwargs): """Mock _laughdetect callout **kwargs: - n : int >=0, number of laughter instances to generate Return a list of 2-tuple floats (start, end) representing laughter instances """ laughs = [] n = kwargs.get("n", 0) for i in range(n): laughs.append((i, i+1)) return laughs def _mock_run_get_output(self, *args, **kwargs) -> str: """Mock run_get_output callout kwargs: - n : int >=0, number of laughter instances to generate Return a string of laughter instance of the form: instance: (1.234, 5.678) """ # TODO: decide if we want non-"instance" output for testing parsing? # (maybe) output = [] n = kwargs.get("n", 0) for i in range(n): output.append(f"instance: ({i}.{i+1}{i+2}{i+3}, {i+4}.{i+5}{i+6}{i+7})") return "\n".join(output) def _sgo5(self, *args, **kwargs): """Mock run_get_output callout""" return self._mock_run_get_output(*args, **kwargs, n=5) """Test LaughterFeatureExtractor""" def test_init(self): test_extractor = extractors.LaughterFeatureExtractor() self.assertTrue(test_extractor) def test_setup_noinput(self): """test setup - no input files""" test_extractor = extractors.LaughterFeatureExtractor() with self.assertRaises(ValueError): test_extractor.setup() # NB test WITH sources implicitly tested in test_extract @pytest.mark.slow def test_extract_mocked_nolaughs(self): """Test extract with mocked laughter detection - no laughs""" video_source = TestSource().one_colour_silent_audio() test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source]) test_extractor._laughdetect = self._mock_laughdetect_callout test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 0) def test_extract_mocked_run_get_output_none(self): """Test extract with mocked laughter detection - no laughs""" video_source = TestSource().one_colour_silent_audio() test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source]) test_extractor._run_get_output = self._mock_run_get_output test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 0) def test_extract_mocked_run_get_output_5(self): """Test extract with mocked laughter detection - 5 laughs""" video_source = TestSource().one_colour_silent_audio() test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source]) test_extractor._run_get_output = self._sgo5 test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 5) def test_run_get_output(self): """Test run_get_output""" video_source = TestSource().one_colour_silent_audio() test_extractor = extractors.LaughterFeatureExtractor(input_files=[video_source]) test_cmd = ["echo", "foo"] test_extractor.setup() output = test_extractor._run_get_output(test_cmd) self.assertEqual(output, "foo\n") # TODO: add sample video with laughs to test _laughdetect() class TestRandomFeatureExtractor(unittest.TestCase): """Test RandomFeatureExtractor""" def test_init(self): test_extractor = extractors.RandomFeatureExtractor() self.assertTrue(test_extractor) def test_setup_noinput(self): """test setup - no input files""" test_extractor = extractors.RandomFeatureExtractor() with self.assertRaises(ValueError): test_extractor.setup() # NB test WITH sources implicitly tested in test_extract def test_extract_noinput(self): """Test extract with no input files""" test_extractor = extractors.RandomFeatureExtractor() with self.assertRaises(ValueError): test_extractor.run() def test_extract(self): """Test extract with input files""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.RandomFeatureExtractor(input_files=video_source) test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertTrue(test_extractor.features) class TestLoudAudioFeatureExtractor(unittest.TestCase): """Test LoudAudioFeatureExtractor""" def _mock_loudnorm_5(self, *args, **kwargs): """Mock _loudnorm It returns a list of 2-tuple floats (time, loudness) representing loud audio instances """ return [(0.0, 0.0), (15.0, 1.0), (25.0, 2.0), (35.0, 3.0), (45.0, 4.0)] def _mock_get_loudnessess(self, *args, length=100, min_loudness=-101, max_loudness=100, seed=42, **kwargs) -> list: """Mock _get_loudnesses() Parameters: - length : int >=0, number of loudness instances to generate - min_loudness : int, minimum loudness value (special value: -101 for "-inf") - max_loudness : int, maximum loudness value Note that int min/max loudness are divided by float 100 to get the actual loudness value between -1.0 and 1.0 Return a list of 2-tuple floats (timecode, loudness) representing loud audio instances """ loudnesses = [] random.seed(seed) for i in range(length): loudness = random.randint(min_loudness, max_loudness) / 100 if min_loudness == -101: loudness = "-inf" if loudness == -1.01 else f"{loudness}" loudnesses.append((float(f"{i*20}.0"), float(loudness))) return loudnesses def test_init(self): video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source) self.assertTrue(test_extractor) def test_init_noinput(self): """test init - no input files""" with self.assertRaises(ValueError): test_extractor = extractors.LoudAudioFeatureExtractor() def test_extract(self): """Test extract with input files""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source) test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(test_extractor.features, []) def test_extract_mocked_loudnorm_5(self): """Test extract with mocked loudness detection""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source) test_extractor._loudnorm = self._mock_loudnorm_5 test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 5) def test_extract_mocked_get_loudnesses(self): """Test extract with mocked loudness detection - 100 loudnesses generated""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source, num_features=100) test_extractor._get_loudnesses = self._mock_get_loudnessess test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 100) def test_keep_num(self): """Test keep_num correctly keeps 5 / 10""" min_duration = 0 video_source = TestSourceMedia().one_colour_silent_audio() with self.subTest("keep 5 (default)"): test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source, min_duration=min_duration, num_features=5) test_extractor._get_loudnesses = self._mock_get_loudnessess test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 5) with self.subTest("keep 10"): test_extractor = extractors.LoudAudioFeatureExtractor(input_files=video_source, min_duration=min_duration, num_features=10) test_extractor._get_loudnesses = self._mock_get_loudnessess test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 10) # TODO: add sample video with loud audio to test _loudnessdetect() class TestVideoActivityFeatureExtractor(unittest.TestCase): """Test VideoActivityFeatureExtractor""" def test_init(self): video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.VideoActivityFeatureExtractor(input_files=video_source) self.assertTrue(test_extractor) def test_init_noinput(self): """test init - no input files""" with self.assertRaises(ValueError): test_extractor = extractors.VideoActivityFeatureExtractor() def test_extract(self): """Test extract with basic input file runs with no errors""" num_features = 50 min_duration = 0 video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.VideoActivityFeatureExtractor(input_files=video_source, num_features=num_features, min_duration=min_duration) test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertTrue(test_extractor.features) # TODO: add sample video with activity to test _activitydetect() class TestJSONFeatureExtractor(unittest.TestCase): """Test JSONFeatureExtractor""" def test_init(self): video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.JSONFeatureExtractor(input_files=video_source) self.assertTrue(test_extractor) def test_init_noinput(self): """test init - no input files""" with self.assertRaises(ValueError): test_extractor = extractors.JSONFeatureExtractor() def test_extract(self): """Test extract with basic input file runs with no errors""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.JSONFeatureExtractor(input_files=video_source) # mock _read_json_from_file test_extractor._read_json_from_file = MockReadJSON().mock_read_json_from_file test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertTrue(test_extractor.features) def test_read_json_from_file(self): """Test _read_json_from_file""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.JSONFeatureExtractor(input_files=video_source) m = unittest.mock.mock_open(read_data='[{"foo": "bar"}]') with unittest.mock.patch("builtins.open", m): test_extractor._read_json_from_file("foo.json") class TestWordFeatureExtractor(unittest.TestCase): """Test WordFeatureExtractor""" @classmethod def setUpClass(cls): sys.modules["faster_whisper"] = MagicMock() _MOCK_SENTENCE = "the quick brown fox jumps over the lazy dog".split() class MockSegment(): """Mock Segment -- has starte, end and text attributes""" def __init__(self, start, end, text): self.start = start self.end = end self.text = text def mock_transcribe(self, *args, **kwargs): """Mock for WhisperModel.model.transcribe returns a 2-tuple: - list of segments + segment = start, end, text - info = language, language_probability We will mock the segments- this provides 9 segments for the sentence: "the quick brown fox jumps over the lazy dog" """ segments = [] for i in range(len(self._MOCK_SENTENCE)): segments.append(self.MockSegment(i, i+1, self._MOCK_SENTENCE[i])) return segments, {"language": "en", "language_probability": 0.9} def test_basic_init(self): video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.WordFeatureExtractor(input_files=video_source) self.assertTrue(test_extractor) def test_init_no_input_videos(self): """test init - no input files""" with self.assertRaises(ValueError): test_extractor = extractors.WordFeatureExtractor() def test_extract_no_words_supplied(self): """Test extract with basic input file but no words specirfied returns zero features""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.WordFeatureExtractor(input_files=video_source) test_extractor.setup() test_extractor.run() test_extractor.teardown() self.assertEqual(test_extractor.features, []) def test_extract_mocked_transcribe_matching_words(self): """Mock out the actual call to transcribe but match all words in the sentence""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.WordFeatureExtractor(input_files=video_source) # mock _transcribe and mock out model and batched pipeline for speed test_extractor._transcribe = self.mock_transcribe test_extractor._model = MagicMock() test_extractor._batched_model = MagicMock() # set up and run the extractor test_extractor.setup(words=self._MOCK_SENTENCE) test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 9) def test_extract_mocked_transcribe_no_matching_words(self): """Mock out the actual call to transcribe but match no words in the sentence""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.WordFeatureExtractor(input_files=video_source) # mock _transcribe and mock out model and batched pipeline for speed test_extractor._transcribe = self.mock_transcribe test_extractor._model = MagicMock() test_extractor._batched_model = MagicMock() # set up and run the extractor test_extractor.setup(words=["nonexistentword"]) test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 0) def test_extract_mocked_transcribe_some_matching_words(self): """Mock out the actual call to transcribe but match some words in the sentence""" video_source = TestSourceMedia().one_colour_silent_audio() test_extractor = extractors.WordFeatureExtractor(input_files=video_source) # mock _transcribe and mock out model and batched pipeline for speed test_extractor._transcribe = self.mock_transcribe test_extractor._model = MagicMock() test_extractor._batched_model = MagicMock() # set up and run the extractor test_extractor.setup(words=["quick", "jumps", "dog"]) test_extractor.run() test_extractor.teardown() self.assertEqual(len(test_extractor.features), 3)