|
- from abc import ABC
- import json
- import logging
- import os
- import random
- import subprocess
- from ast import literal_eval
- from pipeline.utils import SourceMedia, Source, Feature, Interval
-
- # for loudness detection
- import soundfile
- import pyloudnorm
-
- # for word detection
- from faster_whisper import WhisperModel, BatchedInferencePipeline
-
- logger = logging.getLogger(__name__)
-
- class FeatureExtractor(ABC):
- """Feature extractor interface."""
- # TODO: #API -- decide if .features will be a member variable
- def _run_get_output(self, cmd: list, cwd:str=".") -> str:
- """Run a command and return the output as a string
-
- Defined to be mocked out in tests via unittest.mock.patch
- """
- return subprocess.run(cmd, stdout=subprocess.PIPE, cwd=cwd).stdout.decode("utf-8")
-
- def setup(self):
- """Setup the feature extractor -- validate input files & config"""
-
- def run(self):
- """Run the feature extractor -- extract features"""
-
- def teardown(self):
- """Teardown the feature extractor -- clean up any temporary files created during setup"""
-
- class LaughterFeatureExtractor(FeatureExtractor):
- """Feature extractor for laughter detection.
-
- This class is responsible for extracting features corresponding to laughter in media files.
-
- Here:
-
- setup() is used to validate input files & config, which may involve processing video files to extract audio
-
- run() is used to extract features from the audio using jrgillick's laughter-detection
-
- teardown() is used to clean up any temporary files created during setup according to the config
-
- See: https://github.com/jrgillick/laughter-detection for the laughter-detection library
- """
-
- def __init__(self, input_files=None, config=None):
- """It is expected that input_files is a SourceMedia object"""
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def _laughdetect(self, audio_file) -> list:
- """Run laughter detection on the audio file
-
- Returns a list of 2-tuples, each representing a laugh instance in the audio file
- """
- laugh_detector_dir = "/home/robert/mounts/980data/code/laughter-detection/"
- laugh_detector_script = "segment_laughter.py"
- # fake output for testing
- # laugh_detector_path = "tests/fake_segment_laughter.py"
- laugh_detector_cmd = ["python", f"{laugh_detector_dir}{laugh_detector_script}",
- f"--input_audio_file={audio_file}"]
-
- # run command, capture output, ignore exit status
- # use self._run_get_output to allow mocking in tests
- laugh_output = self._run_get_output(laugh_detector_cmd, laugh_detector_dir)
-
- # ↑ have to include cwd to keep laughter-detection imports happy
- # also, it isn't happy if no output dir is specified but we get laughs so it's grand
-
- # laughs are lines in stdout that start with "instance:", followed by a space and a 2-tuple of floats
- # so jump to the 10th character and evaluate the rest of the line
- return [literal_eval(instance[10:])
- for instance in laugh_output.splitlines()
- if instance.startswith("instance: ")]
-
- def _adjust_features(self):
- """Adjust features according to config
-
- Generically, this ensures features conform to config - min/max feature length, etc.
-
- In the context of LaughterFeatureExtractor, there is some secret sauce: things that
- cause a laugh generally /precede/ the laugh, so we want more team before the detected start
- than at the end. For example, for a minimum feature length of 15s, we might prepend 10 seconds,
- and append 5 seconds (for example), or 12s and 3s. We may wish to do this pre/post adjustment
- for all laughter features found, regardless of length.
-
- TODO: figure out how we're going to handle length adjustments
- TODO: config for length adjustments per design doc
- TODO: play with numbers more to see what works best
- """
- PREPEND = 7.0
- APPEND = 3.0
-
- for feature in self.features:
- # do the pre & post adjustment
- feature.interval.move_start(-PREPEND, relative=True)
- feature.interval.move_end(APPEND, relative=True)
-
- def setup(self):
- """Setup the laughter feature extractor -- validate input files & config
-
- jrgillick's laughter-detection library can work with AV files directly
-
- TODO: validate input files
- TODO: handle config
- """
- logger.debug("LaughterFeatureExtractor setup")
-
- # Validate input files
- if not self.input_files:
- raise ValueError("No input files provided")
-
- # TODO: convert video to audio if needed
-
- def run(self):
- """Extract laughter features for each input file"""
- if self.input_files:
- for file in self.input_files:
- # adjust this call for better test mocking
- laughs = self._laughdetect(file.path)
- for laugh in laughs:
- start, end = laugh
- self.features.append(Feature(interval=Interval(start=start, end=end),
- source=file, feature_extractor="laughter"))
- # TODO: implement options eg minimum feature length
-
- # adjust features
- self._adjust_features()
-
- def teardown(self):
- pass
-
- class RandomFeatureExtractor(FeatureExtractor):
- """Feature extractor for random feature generation.
-
- This class is responsible for generating random features for testing purposes.
-
- Here:
-
- setup() is used to validate input files & config
-
- run() is used to generate random features
-
- teardown() is used to clean up any temporary files created during setup according to the config
- """
- NUM_FEATURES = 5
- MAX_DURATION = 20.0
-
- def __init__(self, input_files=None, config=None):
- """It is expected that input_files is a SourceMedia object"""
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def setup(self):
- """Setup the random feature extractor -- validate input files & config"""
- logger.debug("RandomFeatureExtractor setup")
-
- # Validate input files
- if not self.input_files:
- raise ValueError("No input files provided")
-
- def run(self):
- """Generate random features for each input file"""
- # check self.input_files is of type SourceMedia
- if not self.input_files or not isinstance(self.input_files, SourceMedia):
- raise ValueError("No input files provided")
-
- for file in self.input_files:
- for _ in range(self.NUM_FEATURES):
- # round to 3 decimal places
- duration = random.random() * self.MAX_DURATION
- start = random.random() * file.duration() - duration
- self.features.append(Feature(interval=Interval(start=start, duration=duration),
- source=file, feature_extractor="random"))
-
- def teardown(self):
- pass
-
-
- class LoudAudioFeatureExtractor(FeatureExtractor):
- """Feature extractor for loud audio detection.
-
- This class is responsible for extracting features corresponding to loud audio in media files.
-
- Here:
-
- setup() is used to validate input files & config, and extracting audio
-
- run() uses pyloudnorm to detect loud audio
-
- teardown() is used to clean up temporary files created during setup (if specified by config)
- """
- def __init__(self, input_files=None, config=None):
- if not input_files:
- raise ValueError("No input files provided!")
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def _audio_file_from_path(self, path: str) -> str:
- """Return the audio file path given a video file path
-
- Example:
- - in = "/path/to/video.mp4"
- - out = "/tmp/video.mp4.wav"
- """
- OUTPUT_DIR = "/tmp"
- return f"{OUTPUT_DIR}/{os.path.basename(path)}.wav"
-
- def _get_loudnesses(self, data, meter, rate, window_size, stride_size):
- """Extract loudnesses from the audio data using pyloudnorm
-
- return a list of 2-tuples, each representing a timecode and loudness value
- """
- loudnesses = []
-
- for w in range(0, len(data)-window_size, stride_size):
- window = data[w:w+window_size, 0:2] # extract window
- loudnesses.append( (w/rate, meter.integrated_loudness(window)) )
-
- return loudnesses
-
- def _loudnorm(self, audio_file):
- """Run pyloudnorm on the audio file"""
- data, rate = soundfile.read(audio_file) # load audio (with shape (samples, channels))
- meter = pyloudnorm.Meter(rate=rate,block_size=0.3) # create BS.1770 meter
-
- loudness_features = []
- window_size = int(rate * 0.5) # 500ms
- stride_size = int(rate * 0.5) # 500ms -- no overlap
-
- # for w in range(data.shape[0]//100):
- # loudnesses.append(meter.integrated_loudness(data[w:w+int(0.3*rate),0:2]))
- loudnesses = self._get_loudnesses(data, meter, rate, window_size, stride_size)
-
- for timecode, loudval in sorted([l for l in loudnesses if float(l[1]) != float("-inf")], key=lambda x: x[1], reverse=True):
- # print(f"Timecode: {timecode}, Loudness: {loudval}")
- loudness_features.append((timecode, round(loudval, 3))) # round to 3 DP
-
- return loudness_features
-
- def setup(self):
- """extract audio from video files to be processed by pyloudnorm
-
- TODO: config -- hardcoded for now
- """
- # pyloudnorm expects WAV files
- for file in self.input_files:
- audio_file = self._audio_file_from_path(file.path)
- # ffmpeg -i input.mp4 -vn -acodec pcm_s16le output.wav
- subprocess.run(["ffmpeg", "-y", "-i", file.path, "-vn", "-acodec", "pcm_s16le", audio_file],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- def run(self):
- """Use pyloudnorm to detect loud audio"""
- for file in self.input_files:
- audio_file = self._audio_file_from_path(file.path)
- loudnesses = self._loudnorm(audio_file)
- for time, loudness in loudnesses:
- self.features.append(Feature(interval=Interval(start=time, duration=0.500),
- source=file, feature_extractor="loudness",
- score=loudness))
-
-
- class VideoActivityFeatureExtractor(FeatureExtractor):
- """Feature extractor for video activity detection.
-
- This class is responsible for extracting features corresponding to high activity in video files.
-
- Uses ffmpeg's scdet filter with threshold of zero.
-
- Here:
-
- setup() is used to validate input files & config
-
- run() is used to extract features from the video using OpenCV
-
- teardown() is used to clean up any temporary files created during setup according to the config
-
- #TODO: minimum duration -- consider whether to do here, or expand duration post-consolidation
- """
- def __init__(self, input_files=None, config=None):
- if not input_files:
- raise ValueError("No input files provided!")
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def _scdet(self, video_file):
- """Run scdet filter on the video file"""
- ffmpeg_cmd = ["ffmpeg", "-i", video_file, "-vf", "scdet=threshold=0", "-f", "null", "-"]
- # output is of the form:
- # [scdet @ 0x7f0798003d00] lavfi.scd.score: 0.031, lavfi.scd.time: 23.65
- # [scdet @ 0x7f0798003d00] lavfi.scd.score: 0.006, lavfi.scd.time: 23.70
- # capture output, extract time & score
- scdet_output = subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stderr.decode("utf-8")
- # extract time & score
- scores = []
- for line in scdet_output.splitlines():
- if "lavfi.scd.score" in line:
- scores.append( (float(line.split(",")[1].split(":")[1]),
- float(line.split(",")[0].split(":")[1]))
- )
- return scores
-
- def _nonoverlap_mean(self, scores, window_size=0.500) -> list:
- """Take the mean of non-overlapping windows of scores
-
- Input: list of tuples in the format (time, score)
- Output: list of tuples in the format (time, mean_score) (reduced set)
- """
- means = []
- current_window = []
- current_window_start = 0.0
-
- for time, score in scores:
- if time - current_window_start > window_size:
- # calculate mean of current window
- mean_score = sum([s for _, s in current_window]) / len(current_window)
- means.append((current_window_start, round(mean_score, 3)))
- # reset window
- current_window = []
- current_window_start = time
- current_window.append((time, score))
-
- return means
-
- def _drop_lowest(self, scores, percent=33):
- """Drop the lowest n% scores from the list"""
- scores = sorted(scores, key=lambda x: x[1], reverse=True)
- return scores[:int(len(scores) * (percent / 100))]
-
- def setup(self):
- pass
-
- def run(self):
- for file in self.input_files:
- scores = self._scdet(file.path)
- means = sorted(self._nonoverlap_mean(scores), key=lambda x: x[1], reverse=True)
- for time, score in self._drop_lowest(means, 66):
- self.features.append(Feature(interval=Interval(start=time, duration=0.500),
- source=file, feature_extractor="videoactivity",
- score=score))
-
- def teardown(self):
- pass
-
-
- class JSONFeatureExtractor(FeatureExtractor):
- """(Re-)create features from a JSON file
-
- The JSON file can have one of two formats:
- - the format produced by the pipleline (@see: video_producers.py:JSONProducer)
- - a simplified format which is easier for manual creation
- """
-
- def __init__(self, input_files=None, config=None):
- if not input_files:
- raise ValueError("No input files provided!")
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def setup(self):
- pass
-
- def _interval_from_dict(self, d):
- return Interval(start=d["start"], duration=d["duration"])
-
- def _source_from_dict(self, d):
- return Source(d["source"], d["path"], d["provider"])
-
- def _read_json_from_file(self, file):
- """Read a JSON file and return the contents
-
- Method exists to allow for mocking in tests
- """
- with open(file, "r") as f:
- return json.load(f)
-
- def run(self):
-
- # only pipeline JSON format for now
- # TODO: add support for simplified format
- for file in self.input_files:
- features_from_json = self._read_json_from_file(file.path)
-
- for feature in features_from_json:
- self.features.append(Feature(interval=self._interval_from_dict(feature["interval"]),
- source=self._source_from_dict(feature["source"]),
- feature_extractor=feature["feature_extractor"],
- score=feature["score"]))
-
- def teardown(self):
- pass
-
-
- class WordFeatureExtractor(FeatureExtractor):
- """Feature extractor for specific word detection (uses Whisper)"""
- def __init__(self, input_files=None, config=None):
- if not input_files:
- raise ValueError("No input files provided!")
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def setup(self, words=[]):
- """Setup the word feature extractor -- validate input files & config
-
- Whisper expects a list of words to search for in the audio
- """
- logger.debug("WordFeatureExtractor setup")
-
- # Validate input files
- if not self.input_files:
- raise ValueError("No input files provided")
-
- # Validate words - raise a notice if none provided
- if len(words) == 0:
- logger.warning("No words provided for detection")
- self.words = words
- # TODO: consider stripping punctuation since Whisper produces words+punctuation
- # and we might want to strip the punctuation there too
-
- def run(self):
- """Extract features corresponding to supplied target words (defined in setup) for each input file
-
- Use Whisper to detect words in the audio, then match these to target words and create features
-
- Note: if no words are supplied we can exit early
- """
- if len(self.words) == 0: return
-
- if self.DEFAULT_PIPELINE_TYPE == "batched":
- batched = True
- else:
- batched = False
-
- # no early exit
- # TODO: consider maybe loglevel notice of estimated time! consider also: max execution time config?
- # TODO: config options for model size, device, compute type
- model = self._whispermodel() # NB uses defaults, TODO: add config options
-
- # NOTE: batched not available on pypi yet at time of writing
- if batched:
- batched_model = self._batched_inference_pipeline(model)
-
- for file in self.input_files:
- # transcribe the audio file
- if batched:
- segments, _ = self._transcribe(batched_model, file.path, batch_size=self.DEFAULT_BATCH_SIZE)
- else:
- segments, _ = self._transcribe(model, file.path, beam_size=self.DEFAULT_BEAM_SIZE)
-
- # process the segments
- # segment has: start, end, text
- for segment in segments:
- # check if any of the words are in the segment
- for word in segment.text.split():
- if word in self.words:
- self.features.append(Feature(interval=Interval(start=segment.start, end=segment.end),
- source=file, feature_extractor="word",
- score=1.0))
-
|