from abc import ABC import json import logging import os import random import subprocess from ast import literal_eval from pipeline.utils import SourceMedia, Source, Feature, Interval # for loudness detection import soundfile import pyloudnorm # for word detection from faster_whisper import WhisperModel, BatchedInferencePipeline logger = logging.getLogger(__name__) class FeatureExtractor(ABC): """Feature extractor interface.""" def _run_get_output(self, cmd: list, cwd:str=".") -> str: """Run a command and return the output as a string Defined to be mocked out in tests via unittest.mock.patch """ return subprocess.run(cmd, stdout=subprocess.PIPE, cwd=cwd).stdout.decode("utf-8") def setup(self): """Setup the feature extractor -- validate input files & config""" def run(self): """Run the feature extractor -- extract features""" def teardown(self): """Teardown the feature extractor -- clean up any temporary files created during setup""" class LaughterFeatureExtractor(FeatureExtractor): """Feature extractor for laughter detection. This class is responsible for extracting features corresponding to laughter in media files. Uses jrgillick's laughter-detection library. Here: setup() (not needed for laughter-detection, as it can work with AV files directly) run() is used to extract features from the audio using jrgillick's laughter-detection teardown() (not needed) @see: https://github.com/jrgillick/laughter-detection for the laughter-detection library """ _PREPEND_TIME = 7.0 # seconds before the laugh to capture whatever was funny _APPEND_TIME = 3.0 # seconds after the laugh to capture the reaction _CONFIG_LAUGH_DETECTOR_DIR = "/home/robert/mounts/980data/code/laughter-detection/" def __init__(self, input_files=None, config=None): """It is expected that input_files is a SourceMedia object""" self.input_files = input_files self.config = config self.features = [] def _laughdetect(self, audio_file, laugh_detector_dir=_CONFIG_LAUGH_DETECTOR_DIR) -> list: """Run laughter detection on the audio file Returns a list of 2-tuples, each representing a laugh instance in the audio file in the format: (start, end) in seconds """ laugh_detector_script = "segment_laughter.py" # fake output for testing # laugh_detector_path = "tests/fake_segment_laughter.py" laugh_detector_cmd = ["python", f"{laugh_detector_dir}{laugh_detector_script}", f"--input_audio_file={audio_file}"] # run command, capture output, ignore exit status # use self._run_get_output to allow mocking in tests laugh_output = self._run_get_output(laugh_detector_cmd, laugh_detector_dir) # ↑ have to include cwd to keep laughter-detection imports happy # also, it isn't happy if no output dir is specified but we get laughs so it's grand # laughs are lines in stdout that start with "instance:", followed by a space and a 2-tuple of floats # so jump to the 10th character and evaluate the rest of the line return [literal_eval(instance[10:]) for instance in laugh_output.splitlines() if instance.startswith("instance: ")] def _adjust_features(self) -> None: """Adjust features according to config Generically, this ensures features conform to config - min/max feature length, etc. In the context of LaughterFeatureExtractor, there is some secret sauce: things that cause a laugh generally /precede/ the laugh, so we want more team before the detected start than at the end. For example, for a minimum feature length of 15s, we might prepend 10 seconds, and append 5 seconds (for example), or 12s and 3s. We may wish to do this pre/post adjustment for all laughter features found, regardless of length. """ for feature in self.features: # do the pre & post adjustment feature.interval.move_start(-self._PREPEND_TIME, relative=True) feature.interval.move_end(self._APPEND_TIME, relative=True) def setup(self): """Setup the laughter feature extractor -- not needed. jrgillick's laughter-detection library can work with AV files directly! """ if not self.input_files or len(self.input_files) == 0: raise ValueError("No input files provided!") def run(self): """Extract laughter features for each input file. Heavy lifting is performed in _laughdetect() Tuples from _laughdetect are used to create Feature objects, which are appended to self.features by convention @see: utils.py:Feature, Interval """ if self.input_files: for file in self.input_files: laughs = self._laughdetect(file.path) for laugh in laughs: start, end = laugh self.features.append(Feature(interval=Interval(start=start, end=end), source=file, feature_extractor="laughter")) # adjust features self._adjust_features() def teardown(self): """No cleanup needed!""" class RandomFeatureExtractor(FeatureExtractor): """Feature extractor for random feature generation. This class is responsible for generating random features for testing purposes. Here: setup() is not needed run() is used to generate random features teardown() is not needed """ NUM_FEATURES = 30 MAX_DURATION = 15.0 MIN_DURATION = 5.0 def __init__(self, input_files=None, config=None): """It is expected that input_files is a SourceMedia object""" self.input_files = input_files self.config = config self.features = [] def setup(self): """Setup the random feature extractor -- validate input files & config""" logger.debug("RandomFeatureExtractor setup") # Validate input files if not self.input_files: raise ValueError("No input files provided") def run(self): """Generate random features for each input file""" # check self.input_files is of type SourceMedia if not self.input_files or not isinstance(self.input_files, SourceMedia): raise ValueError("No input files provided") for file in self.input_files: for _ in range(self.NUM_FEATURES): # determine duration between MIN and MAX, round to 3 decimal places duration = round(random.uniform(self.MIN_DURATION, self.MAX_DURATION), 3) start = random.random() * file.duration() - duration self.features.append(Feature(interval=Interval(start=start, duration=duration), source=file, feature_extractor="random")) def teardown(self): pass class LoudAudioFeatureExtractor(FeatureExtractor): """Feature extractor for loud audio detection. This class is responsible for extracting features corresponding to loud audio in media files. Here: setup() is used to validate input files & config, and extracting audio run() uses pyloudnorm to detect loud audio teardown() is used to clean up temporary files created during setup (if specified by config) """ _CONFIG_DEFAULT_NUM_FEATURES = 15 # keep the top 5 loudnesses _CONFIG_DEFAULT_MIN_DURATION = 5.00 # seconds def __init__(self, input_files=None, config=None, num_features=_CONFIG_DEFAULT_NUM_FEATURES, min_duration=_CONFIG_DEFAULT_MIN_DURATION): if not input_files: raise ValueError("No input files provided!") self.input_files = input_files self.config = config self.features = [] self._num_features = num_features self._min_duration = min_duration def _audio_file_from_path(self, path: str) -> str: """Return the audio file path given a video file path Example: - in = "/path/to/video.mp4" - out = "/tmp/video.mp4.wav" """ OUTPUT_DIR = "/tmp" return f"{OUTPUT_DIR}/{os.path.basename(path)}.wav" def _get_loudnesses(self, data, meter, rate, window_size, stride_size): """Extract loudnesses from the audio data using pyloudnorm return a list of 2-tuples, each representing a timecode and loudness value """ loudnesses = [] for w in range(0, len(data)-window_size, stride_size): window = data[w:w+window_size, 0:2] # extract window loudnesses.append( (w/rate, meter.integrated_loudness(window)) ) return loudnesses def _loudnorm(self, audio_file): """Run pyloudnorm on the audio file""" data, rate = soundfile.read(audio_file) # load audio (with shape (samples, channels)) meter = pyloudnorm.Meter(rate=rate,block_size=0.3) # create BS.1770 meter loudness_features = [] window_size = int(rate * 0.5) # 500ms stride_size = int(rate * 0.5) # 500ms -- no overlap # for w in range(data.shape[0]//100): # loudnesses.append(meter.integrated_loudness(data[w:w+int(0.3*rate),0:2])) loudnesses = self._get_loudnesses(data, meter, rate, window_size, stride_size) for timecode, loudval in sorted([l for l in loudnesses if float(l[1]) != float("-inf")], key=lambda x: x[1], reverse=True): # print(f"Timecode: {timecode}, Loudness: {loudval}") loudness_features.append((timecode, round(loudval, 3))) # round to 3 DP return loudness_features def _keep_num(self, features, num=_CONFIG_DEFAULT_NUM_FEATURES, margin=10.0) -> list: """Keep the top n features (default: 5) Approach: - for range in 0-n + expand the nth top feature to min duration (move start back by 0.5*min_duration, end forward by 0.5*min_duration) + drop any features that are now in that feature's range (plus margin) - return the top n features Each feature is a Feature object, with an Interval object """ keep_features = [] # ensure features are sorted by score features = sorted(features, key=lambda x: x.score, reverse=True) for i in range(num): current_feature = features.pop(0) # expand the feature to min_duration current_feature.interval.move_start(-0.5*self._min_duration, relative=True) current_feature.interval.move_end(0.5*self._min_duration, relative=True) keep_features.append(current_feature) # drop any features that are now in that feature's range (plus margin) features = [f for f in features if (f.interval.end < current_feature.interval.start-margin or f.interval.start > current_feature.interval.end+margin)] return keep_features def setup(self): """extract audio from video files to be processed by pyloudnorm TODO: config -- hardcoded for now """ # pyloudnorm expects WAV files for file in self.input_files: audio_file = self._audio_file_from_path(file.path) # ffmpeg -i input.mp4 -vn -acodec pcm_s16le output.wav subprocess.run(["ffmpeg", "-y", "-i", file.path, "-vn", "-acodec", "pcm_s16le", audio_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) def run(self): """Use pyloudnorm to detect loud audio""" for file in self.input_files: audio_file = self._audio_file_from_path(file.path) loudnesses = self._loudnorm(audio_file) features = [] for time, loudness in loudnesses: features.append(Feature(interval=Interval(start=time, duration=0.500), source=file, feature_extractor="loudness", score=loudness)) # prune features list to keep self.num_features self.features = self._keep_num(features, self._num_features) class VideoActivityFeatureExtractor(FeatureExtractor): """Feature extractor for video activity detection. This class is responsible for extracting features corresponding to high activity in video files. Uses ffmpeg's scdet filter with threshold of zero. Here: setup() is used to validate input files & config run() is used to extract features from the video using OpenCV teardown() is used to clean up any temporary files created during setup according to the config #TODO: minimum duration -- consider whether to do here, or expand duration post-consolidation """ _CONFIG_DEFAULT_NUM_FEATURES = 15 # keep the top 5 activity moments _CONFIG_DEFAULT_MIN_DURATION = 5.00 # seconds def __init__(self, input_files=None, config=None, num_features=_CONFIG_DEFAULT_NUM_FEATURES, min_duration=_CONFIG_DEFAULT_MIN_DURATION): if not input_files: raise ValueError("No input files provided!") self.input_files = input_files self.config = config self.features = [] self._num_features = num_features self._min_duration = min_duration def _scdet(self, video_file): """Run scdet filter on the video file""" ffmpeg_cmd = ["ffmpeg", "-i", video_file, "-vf", "scdet=threshold=0", "-f", "null", "-"] # output is of the form: # [scdet @ 0x7f0798003d00] lavfi.scd.score: 0.031, lavfi.scd.time: 23.65 # [scdet @ 0x7f0798003d00] lavfi.scd.score: 0.006, lavfi.scd.time: 23.70 # capture output, extract time & score scdet_output = subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stderr.decode("utf-8") # extract time & score scores = [] for line in scdet_output.splitlines(): if "lavfi.scd.score" in line: scores.append( (float(line.split(",")[1].split(":")[1]), float(line.split(",")[0].split(":")[1])) ) return scores def _nonoverlap_mean(self, scores, window_size=0.500) -> list: """Take the mean of non-overlapping windows of scores Input: list of tuples in the format (time, score) Output: list of tuples in the format (time, mean_score) (reduced set) """ means = [] current_window = [] current_window_start = 0.0 for time, score in scores: if time - current_window_start > window_size: # calculate mean of current window mean_score = sum([s for _, s in current_window]) / len(current_window) means.append((current_window_start, round(mean_score, 3))) # reset window current_window = [] current_window_start = time current_window.append((time, score)) return means def _drop_lowest(self, scores, percent=33): """Drop the lowest n% scores from the list""" scores = sorted(scores, key=lambda x: x[1], reverse=True) return scores[:int(len(scores) * (percent / 100))] def _keep_num(self, features, num=_CONFIG_DEFAULT_NUM_FEATURES, margin=10.0) -> list: """Keep the top n features (default: 5) Approach: - for range in 0-n + expand the nth top feature to min duration (move start back by 0.5*min_duration, end forward by 0.5*min_duration) + drop any features that are now in that feature's range (plus margin) - return the top n features Each feature is a Feature object, with an Interval object """ keep_features = [] # ensure features are sorted by score features = sorted(features, key=lambda x: x.score, reverse=True) for i in range(num): current_feature = features.pop(0) # expand the feature to min_duration current_feature.interval.move_start(-0.5*self._min_duration, relative=True) current_feature.interval.move_end(0.5*self._min_duration, relative=True) keep_features.append(current_feature) # drop any features that are now in that feature's range (plus margin) features = [f for f in features if (f.interval.end < current_feature.interval.start-margin or f.interval.start > current_feature.interval.end+margin)] return keep_features def setup(self): pass def run(self): for file in self.input_files: scores = self._scdet(file.path) means = sorted(self._nonoverlap_mean(scores), key=lambda x: x[1], reverse=True) features = [] for time, score in self._drop_lowest(means, 66): features.append(Feature(interval=Interval(start=time, duration=0.500), source=file, feature_extractor="videoactivity", score=score)) # prune features list to keep self.num_features self.features = self._keep_num(features, self._num_features) def teardown(self): pass class JSONFeatureExtractor(FeatureExtractor): """(Re-)create features from a JSON file The JSON file can have one of two formats: - the format produced by the pipleline (@see: video_producers.py:JSONProducer) - a simplified format which is easier for manual creation """ def __init__(self, input_files=None, config=None): if not input_files: raise ValueError("No input files provided!") self.input_files = input_files self.config = config self.features = [] def setup(self): pass def _interval_from_dict(self, d): return Interval(start=d["start"], duration=d["duration"]) def _source_from_dict(self, d): return Source(d["source"], d["path"], d["provider"]) def _read_json_from_file(self, file): """Read a JSON file and return the contents Method exists to allow for mocking in tests """ with open(file, "r") as f: return json.load(f) def run(self): # only pipeline JSON format for now # TODO: add support for simplified format for file in self.input_files: features_from_json = self._read_json_from_file(file.path) for feature in features_from_json: self.features.append(Feature(interval=self._interval_from_dict(feature["interval"]), source=self._source_from_dict(feature["source"]), feature_extractor=feature["feature_extractor"], score=feature["score"])) def teardown(self): pass class WordFeatureExtractor(FeatureExtractor): """Feature extractor for specific word detection (uses Whisper)""" # set defaults for whisper settings DEFAULT_MODEL_SIZE = "medium" DEFAULT_DEVICE = "cpu" DEFAULT_COMPUTE_TYPE = "int8" DEFAULT_BEAM_SIZE = 5 DEFAULT_BATCH_SIZE = 16 DEFAULT_PIPELINE_TYPE = "batched" # or "stream" words = [] def _transcribe(self, model, file, **kwargs): """Defined here to allow for mocking in tests""" return model.transcribe(file, **kwargs) def _whispermodel(self, model_size=DEFAULT_MODEL_SIZE, device=DEFAULT_DEVICE, compute_type=DEFAULT_COMPUTE_TYPE): """Defined here to allow for mocking out in tests""" return WhisperModel(model_size, device=device, compute_type=compute_type) def _batched_inference_pipeline(self, model): """Defined here to allow for mocking out in tests""" return BatchedInferencePipeline(model=model) def __init__(self, input_files=None, config=None): if not input_files: raise ValueError("No input files provided!") self.input_files = input_files self.config = config self.features = [] def setup(self, words=[]): """Setup the word feature extractor -- validate input files & config Whisper expects a list of words to search for in the audio """ logger.debug("WordFeatureExtractor setup") # Validate words - raise a notice if none provided if len(words) == 0: logger.warning("No words provided for detection") self.words = words # TODO: consider stripping punctuation since Whisper produces words+punctuation # and we might want to strip the punctuation there too def run(self): """Extract features corresponding to supplied target words (defined in setup) for each input file Use Whisper to detect words in the audio, then match these to target words and create features Note: if no words are supplied we can exit early """ if len(self.words) == 0: return if self.DEFAULT_PIPELINE_TYPE == "batched": batched = True else: batched = False # no early exit # TODO: consider maybe loglevel notice of estimated time! consider also: max execution time config? # TODO: config options for model size, device, compute type model = self._whispermodel() # NB uses defaults, TODO: add config options # NOTE: batched not available on pypi yet at time of writing if batched: batched_model = self._batched_inference_pipeline(model) for file in self.input_files: # transcribe the audio file if batched: segments, _ = self._transcribe(batched_model, file.path, batch_size=self.DEFAULT_BATCH_SIZE) else: segments, _ = self._transcribe(model, file.path, beam_size=self.DEFAULT_BEAM_SIZE) # process the segments # segment has: start, end, text for segment in segments: # check if any of the words are in the segment for word in segment.text.split(): if word in self.words: self.features.append(Feature(interval=Interval(start=segment.start, end=segment.end), source=file, feature_extractor="word", score=1.0))