from abc import ABC import json import logging import os import random import subprocess from ast import literal_eval from pipeline.utils import SourceMedia, Source, Feature, Interval # for loudness detection import soundfile import pyloudnorm logger = logging.getLogger(__name__) class FeatureExtractor(ABC): """Feature extractor interface.""" # TODO: #API -- decide if .features will be a member variable def _run_get_output(self, cmd: list, cwd:str=".") -> str: """Run a command and return the output as a string Defined to be mocked out in tests via unittest.mock.patch """ return subprocess.run(cmd, stdout=subprocess.PIPE, cwd=cwd).stdout.decode("utf-8") def setup(self): """Setup the feature extractor -- validate input files & config""" def run(self): """Run the feature extractor -- extract features""" def teardown(self): """Teardown the feature extractor -- clean up any temporary files created during setup""" class LaughterFeatureExtractor(FeatureExtractor): """Feature extractor for laughter detection. This class is responsible for extracting features corresponding to laughter in media files. Here: setup() is used to validate input files & config, which may involve processing video files to extract audio run() is used to extract features from the audio using jrgillick's laughter-detection teardown() is used to clean up any temporary files created during setup according to the config See: https://github.com/jrgillick/laughter-detection for the laughter-detection library """ def __init__(self, input_files=None, config=None): """It is expected that input_files is a SourceMedia object""" self.input_files = input_files self.config = config self.features = [] def _laughdetect(self, audio_file) -> list: """Run laughter detection on the audio file Returns a list of 2-tuples, each representing a laugh instance in the audio file """ laugh_detector_dir = "/home/robert/mounts/980data/code/laughter-detection/" laugh_detector_script = "segment_laughter.py" # fake output for testing # laugh_detector_path = "tests/fake_segment_laughter.py" laugh_detector_cmd = ["python", f"{laugh_detector_dir}{laugh_detector_script}", f"--input_audio_file={audio_file}"] # run command, capture output, ignore exit status # use self._run_get_output to allow mocking in tests laugh_output = self._run_get_output(laugh_detector_cmd, laugh_detector_dir) # ↑ have to include cwd to keep laughter-detection imports happy # also, it isn't happy if no output dir is specified but we get laughs so it's grand # laughs are lines in stdout that start with "instance:", followed by a space and a 2-tuple of floats # so jump to the 10th character and evaluate the rest of the line return [literal_eval(instance[10:]) for instance in laugh_output.splitlines() if instance.startswith("instance: ")] def _adjust_features(self): """Adjust features according to config Generically, this ensures features conform to config - min/max feature length, etc. In the context of LaughterFeatureExtractor, there is some secret sauce: things that cause a laugh generally /precede/ the laugh, so we want more team before the detected start than at the end. For example, for a minimum feature length of 15s, we might prepend 10 seconds, and append 5 seconds (for example), or 12s and 3s. We may wish to do this pre/post adjustment for all laughter features found, regardless of length. TODO: figure out how we're going to handle length adjustments TODO: config for length adjustments per design doc TODO: play with numbers more to see what works best """ PREPEND = 7.0 APPEND = 3.0 for feature in self.features: # do the pre & post adjustment feature.interval.move_start(-PREPEND, relative=True) feature.interval.move_end(APPEND, relative=True) def setup(self): """Setup the laughter feature extractor -- validate input files & config jrgillick's laughter-detection library can work with AV files directly TODO: validate input files TODO: handle config """ logger.debug("LaughterFeatureExtractor setup") # Validate input files if not self.input_files: raise ValueError("No input files provided") # TODO: convert video to audio if needed def run(self): """Extract laughter features for each input file""" if self.input_files: for file in self.input_files: # adjust this call for better test mocking laughs = self._laughdetect(file.path) for laugh in laughs: start, end = laugh self.features.append(Feature(interval=Interval(start=start, end=end), source=file, feature_extractor="laughter")) # TODO: implement options eg minimum feature length # adjust features self._adjust_features() def teardown(self): pass class RandomFeatureExtractor(FeatureExtractor): """Feature extractor for random feature generation. This class is responsible for generating random features for testing purposes. Here: setup() is used to validate input files & config run() is used to generate random features teardown() is used to clean up any temporary files created during setup according to the config """ NUM_FEATURES = 5 MAX_DURATION = 20.0 def __init__(self, input_files=None, config=None): """It is expected that input_files is a SourceMedia object""" self.input_files = input_files self.config = config self.features = [] def setup(self): """Setup the random feature extractor -- validate input files & config""" logger.debug("RandomFeatureExtractor setup") # Validate input files if not self.input_files: raise ValueError("No input files provided") def run(self): """Generate random features for each input file""" # check self.input_files is of type SourceMedia if not self.input_files or not isinstance(self.input_files, SourceMedia): raise ValueError("No input files provided") for file in self.input_files: for _ in range(self.NUM_FEATURES): # round to 3 decimal places duration = random.random() * self.MAX_DURATION start = random.random() * file.duration() - duration self.features.append(Feature(interval=Interval(start=start, duration=duration), source=file, feature_extractor="random")) def teardown(self): pass class LoudAudioFeatureExtractor(FeatureExtractor): """Feature extractor for loud audio detection. This class is responsible for extracting features corresponding to loud audio in media files. Here: setup() is used to validate input files & config, and extracting audio run() uses pyloudnorm to detect loud audio teardown() is used to clean up temporary files created during setup (if specified by config) """ def __init__(self, input_files=None, config=None): if not input_files: raise ValueError("No input files provided!") self.input_files = input_files self.config = config self.features = [] def _audio_file_from_path(self, path: str) -> str: """Return the audio file path given a video file path Example: - in = "/path/to/video.mp4" - out = "/tmp/video.mp4.wav" """ OUTPUT_DIR = "/tmp" return f"{OUTPUT_DIR}/{os.path.basename(path)}.wav" def _get_loudnesses(self, data, meter, rate, window_size, stride_size): """Extract loudnesses from the audio data using pyloudnorm return a list of 2-tuples, each representing a timecode and loudness value """ loudnesses = [] for w in range(0, len(data)-window_size, stride_size): window = data[w:w+window_size, 0:2] # extract window loudnesses.append( (w/rate, meter.integrated_loudness(window)) ) return loudnesses def _loudnorm(self, audio_file): """Run pyloudnorm on the audio file""" data, rate = soundfile.read(audio_file) # load audio (with shape (samples, channels)) meter = pyloudnorm.Meter(rate=rate,block_size=0.3) # create BS.1770 meter loudness_features = [] window_size = int(rate * 0.5) # 500ms stride_size = int(rate * 0.5) # 500ms -- no overlap # for w in range(data.shape[0]//100): # loudnesses.append(meter.integrated_loudness(data[w:w+int(0.3*rate),0:2])) loudnesses = self._get_loudnesses(data, meter, rate, window_size, stride_size) for timecode, loudval in sorted([l for l in loudnesses if float(l[1]) != float("-inf")], key=lambda x: x[1], reverse=True): # print(f"Timecode: {timecode}, Loudness: {loudval}") loudness_features.append((timecode, round(loudval, 3))) # round to 3 DP return loudness_features def setup(self): """extract audio from video files to be processed by pyloudnorm TODO: config -- hardcoded for now """ # pyloudnorm expects WAV files for file in self.input_files: audio_file = self._audio_file_from_path(file.path) # ffmpeg -i input.mp4 -vn -acodec pcm_s16le output.wav subprocess.run(["ffmpeg", "-y", "-i", file.path, "-vn", "-acodec", "pcm_s16le", audio_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) def run(self): """Use pyloudnorm to detect loud audio""" for file in self.input_files: audio_file = self._audio_file_from_path(file.path) loudnesses = self._loudnorm(audio_file) for time, loudness in loudnesses: self.features.append(Feature(interval=Interval(start=time, duration=0.500), source=file, feature_extractor="loudness", score=loudness)) class VideoActivityFeatureExtractor(FeatureExtractor): """Feature extractor for video activity detection. This class is responsible for extracting features corresponding to high activity in video files. Uses ffmpeg's scdet filter with threshold of zero. Here: setup() is used to validate input files & config run() is used to extract features from the video using OpenCV teardown() is used to clean up any temporary files created during setup according to the config #TODO: minimum duration -- consider whether to do here, or expand duration post-consolidation """ def __init__(self, input_files=None, config=None): if not input_files: raise ValueError("No input files provided!") self.input_files = input_files self.config = config self.features = [] def _scdet(self, video_file): """Run scdet filter on the video file""" ffmpeg_cmd = ["ffmpeg", "-i", video_file, "-vf", "scdet=threshold=0", "-f", "null", "-"] # output is of the form: # [scdet @ 0x7f0798003d00] lavfi.scd.score: 0.031, lavfi.scd.time: 23.65 # [scdet @ 0x7f0798003d00] lavfi.scd.score: 0.006, lavfi.scd.time: 23.70 # capture output, extract time & score scdet_output = subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stderr.decode("utf-8") # extract time & score scores = [] for line in scdet_output.splitlines(): if "lavfi.scd.score" in line: scores.append( (float(line.split(",")[1].split(":")[1]), float(line.split(",")[0].split(":")[1])) ) return scores def _nonoverlap_mean(self, scores, window_size=0.500) -> list: """Take the mean of non-overlapping windows of scores Input: list of tuples in the format (time, score) Output: list of tuples in the format (time, mean_score) (reduced set) """ means = [] current_window = [] current_window_start = 0.0 for time, score in scores: if time - current_window_start > window_size: # calculate mean of current window mean_score = sum([s for _, s in current_window]) / len(current_window) means.append((current_window_start, round(mean_score, 3))) # reset window current_window = [] current_window_start = time current_window.append((time, score)) return means def _drop_lowest(self, scores, percent=33): """Drop the lowest n% scores from the list""" scores = sorted(scores, key=lambda x: x[1], reverse=True) return scores[:int(len(scores) * (percent / 100))] def setup(self): pass def run(self): for file in self.input_files: scores = self._scdet(file.path) means = sorted(self._nonoverlap_mean(scores), key=lambda x: x[1], reverse=True) for time, score in self._drop_lowest(means, 66): self.features.append(Feature(interval=Interval(start=time, duration=0.500), source=file, feature_extractor="videoactivity", score=score)) def teardown(self): pass class JSONFeatureExtractor(FeatureExtractor): """(Re-)create features from a JSON file The JSON file can have one of two formats: - the format produced by the pipleline (@see: video_producers.py:JSONProducer) - a simplified format which is easier for manual creation """ def __init__(self, input_files=None, config=None): if not input_files: raise ValueError("No input files provided!") self.input_files = input_files self.config = config self.features = [] def setup(self): pass def _interval_from_dict(self, d): return Interval(start=d["start"], duration=d["duration"]) def _source_from_dict(self, d): return Source(d["source"], d["path"], d["provider"]) def _read_json_from_file(self, file): """Read a JSON file and return the contents Method exists to allow for mocking in tests """ with open(file, "r") as f: return json.load(f) def run(self): # only pipeline JSON format for now # TODO: add support for simplified format for file in self.input_files: features_from_json = self._read_json_from_file(file.path) for feature in features_from_json: self.features.append(Feature(interval=self._interval_from_dict(feature["interval"]), source=self._source_from_dict(feature["source"]), feature_extractor=feature["feature_extractor"], score=feature["score"])) def teardown(self): pass