|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- from abc import ABC
- import logging
- import os
- import random
- import subprocess
- from ast import literal_eval
- from pipeline.utils import SourceMedia, Feature, Interval
- # for loudness detection
- import soundfile
- import pyloudnorm
-
- logger = logging.getLogger(__name__)
-
- class FeatureExtractor(ABC):
- """Feature extractor interface."""
- # TODO: #API -- decide if .features will be a member variable
- def setup(self):
- pass
-
- def run(self):
- pass
-
- def teardown(self):
- pass
-
- class LaughterFeatureExtractor(FeatureExtractor):
- """Feature extractor for laughter detection.
-
- This class is responsible for extracting features corresponding to laughter in media files.
-
- Here:
-
- setup() is used to validate input files & config, which may involve processing video files to extract audio
-
- run() is used to extract features from the audio using jrgillick's laughter-detection
-
- teardown() is used to clean up any temporary files created during setup according to the config
-
- See: https://github.com/jrgillick/laughter-detection for the laughter-detection library
- """
-
- def __init__(self, input_files=None, config=None):
- """It is expected that input_files is a SourceMedia object"""
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def _laughdetect(self, audio_file):
- """Run laughter detection on the audio file"""
- laugh_detector_dir = "/home/robert/mounts/980data/code/laughter-detection/"
- laugh_detector_script = "segment_laughter.py"
- # fake output for testing
- # laugh_detector_path = "tests/fake_segment_laughter.py"
- laugh_detector_cmd = ["python", f"{laugh_detector_dir}{laugh_detector_script}",
- f"--input_audio_file={audio_file}"]
-
- # run command, capture output, ignore exit status
- laugh_output = subprocess.run(laugh_detector_cmd,
- stdout=subprocess.PIPE,
- cwd=laugh_detector_dir).stdout.decode("utf-8")
- # ↑ have to include cwd to keep laughter-detection imports happy
- # also, it isn't happy if no output dir is specified but we get laughs so it's grand
-
- # laughs are lines in stdout that start with "instance:", followed by a space and a 2-tuple of floats
- # so jump to the 10th character and evaluate the rest of the line
- return [literal_eval(instance[10:])
- for instance in laugh_output.splitlines()
- if instance.startswith("instance: ")]
-
- def _adjust_features(self):
- """Adjust features according to config
-
- Generically, this ensures features conform to config - min/max feature length, etc.
-
- In the context of LaughterFeatureExtractor, there is some secret sauce: things that
- cause a laugh generally /precede/ the laugh, so we want more team before the detected start
- than at the end. For example, for a minimum feature length of 15s, we might prepend 10 seconds,
- and append 5 seconds (for example), or 12s and 3s. We may wish to do this pre/post adjustment
- for all laughter features found, regardless of length.
-
- TODO: figure out how we're going to handle length adjustments
- TODO: config for length adjustments per design doc
- TODO: play with numbers more to see what works best
- """
- PREPEND = 7.0
- APPEND = 3.0
-
- for feature in self.features:
- # do the pre & post adjustment
- feature.interval.move_start(-PREPEND, relative=True)
- feature.interval.move_end(APPEND, relative=True)
-
- def setup(self):
- """Setup the laughter feature extractor -- validate input files & config
-
- jrgillick's laughter-detection library can work with AV files directly
-
- TODO: validate input files
- TODO: handle config
- """
- logger.debug("LaughterFeatureExtractor setup")
-
- # Validate input files
- if not self.input_files:
- raise ValueError("No input files provided")
-
- # TODO: convert video to audio if needed
-
- def run(self):
- """Extract laughter features for each input file"""
- if self.input_files:
- for file in self.input_files:
- laughs = self._laughdetect(file.path)
- for laugh in laughs:
- start, end = laugh
- self.features.append(Feature(interval=Interval(start=start, end=end),
- source="laughter", path=file.path))
- # TODO: implement options eg minimum feature length
-
- # adjust features
- self._adjust_features()
-
- def teardown(self):
- pass
-
- class RandomFeatureExtractor(FeatureExtractor):
- """Feature extractor for random feature generation.
-
- This class is responsible for generating random features for testing purposes.
-
- Here:
-
- setup() is used to validate input files & config
-
- run() is used to generate random features
-
- teardown() is used to clean up any temporary files created during setup according to the config
- """
- NUM_FEATURES = 5
- MAX_DURATION = 20.0
-
- def __init__(self, input_files=None, config=None):
- """It is expected that input_files is a SourceMedia object"""
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def setup(self):
- """Setup the random feature extractor -- validate input files & config"""
- logger.debug("RandomFeatureExtractor setup")
-
- # Validate input files
- if not self.input_files:
- raise ValueError("No input files provided")
-
- def run(self):
- """Generate random features for each input file"""
- # check self.input_files is of type SourceMedia
- if not self.input_files or not isinstance(self.input_files, SourceMedia):
- raise ValueError("No input files provided")
-
- for file in self.input_files:
- for _ in range(self.NUM_FEATURES):
- # round to 3 decimal places
- duration = random.random() * self.MAX_DURATION
- start = random.random() * file.duration() - duration
- self.features.append(Feature(interval=Interval(start=start, duration=duration),
- source="random", path=file.path))
- class LoudAudioFeatureExtractor(FeatureExtractor):
- """Feature extractor for loud audio detection.
-
- This class is responsible for extracting features corresponding to loud audio in media files.
-
- Here:
-
- setup() is used to validate input files & config, and extracting audio
-
- run() uses pyloudnorm to detect loud audio
-
- teardown() is used to clean up temporary files created during setup (if specified by config)
- """
- def __init__(self, input_files=None, config=None):
- if not input_files:
- raise ValueError("No input files provided!")
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def _audio_file_from_path(self, path: str) -> str:
- """Return the audio file path given a video file path
-
- Example:
- - in = "/path/to/video.mp4"
- - out = "/tmp/video.mp4.wav"
- """
- OUTPUT_DIR = "/tmp"
- return f"{OUTPUT_DIR}/{os.path.basename(path)}.wav"
-
- def _loudnorm(self, audio_file):
- """Run pyloudnorm on the audio file"""
- data, rate = soundfile.read(audio_file) # load audio (with shape (samples, channels))
- meter = pyloudnorm.Meter(rate=rate,block_size=0.3) # create BS.1770 meter
-
- loudnesses = []
- loudness_features = []
- window_size = int(rate * 0.5) # 500ms
- stride_size = int(rate * 0.5) # 500ms -- no overlap
-
- # for w in range(data.shape[0]//100):
- # loudnesses.append(meter.integrated_loudness(data[w:w+int(0.3*rate),0:2]))
- for w in range(0, len(data)-window_size, stride_size):
- window = data[w:w+window_size, 0:2] # extract window
- loudnesses.append( (w/rate, meter.integrated_loudness(window)) )
-
- for timecode, loudval in sorted([l for l in loudnesses if float(l[1]) != float("-inf")], key=lambda x: x[1], reverse=True):
- # print(f"Timecode: {timecode}, Loudness: {loudval}")
- loudness_features.append((timecode, round(loudval, 3))) # round to 3 DP
-
- return loudness_features
-
- def setup(self):
- """extract audio from video files to be processed by pyloudnorm
-
- TODO: config -- hardcoded for now
- """
- # pyloudnorm expects WAV files
- for file in self.input_files:
- audio_file = self._audio_file_from_path(file.path)
- # ffmpeg -i input.mp4 -vn -acodec pcm_s16le output.wav
- subprocess.run(["ffmpeg", "-y", "-i", file.path, "-vn", "-acodec", "pcm_s16le", audio_file],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- def run(self):
- """Use pyloudnorm to detect loud audio"""
- for file in self.input_files:
- audio_file = self._audio_file_from_path(file.path)
- loudnesses = self._loudnorm(audio_file)
- for time, loudness in loudnesses:
- self.features.append(Feature(interval=Interval(start=time, duration=0.500),
- source=file, feature_extractor="loudness",
- score=loudness))
-
-
- class VideoActivityFeatureExtractor(FeatureExtractor):
- """Feature extractor for video activity detection.
-
- This class is responsible for extracting features corresponding to high activity in video files.
-
- Uses ffmpeg's scdet filter with threshold of zero.
-
- Here:
-
- setup() is used to validate input files & config
-
- run() is used to extract features from the video using OpenCV
-
- teardown() is used to clean up any temporary files created during setup according to the config
-
- #TODO: minimum duration -- consider whether to do here, or expand duration post-consolidation
- """
- def __init__(self, input_files=None, config=None):
- if not input_files:
- raise ValueError("No input files provided!")
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def _scdet(self, video_file):
- """Run scdet filter on the video file"""
- ffmpeg_cmd = ["ffmpeg", "-i", video_file, "-vf", "scdet=threshold=0", "-f", "null", "-"]
- # output is of the form:
- # [scdet @ 0x7f0798003d00] lavfi.scd.score: 0.031, lavfi.scd.time: 23.65
- # [scdet @ 0x7f0798003d00] lavfi.scd.score: 0.006, lavfi.scd.time: 23.70
- # capture output, extract time & score
- scdet_output = subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stderr.decode("utf-8")
- # extract time & score
- scores = []
- for line in scdet_output.splitlines():
- if "lavfi.scd.score" in line:
- scores.append( (float(line.split(",")[1].split(":")[1]),
- float(line.split(",")[0].split(":")[1]))
- )
- return scores
-
- def _drop_lowest(self, scores, percent=33):
- """Drop the lowest n% scores from the list"""
- scores = sorted(scores, key=lambda x: x[1], reverse=True)
- return scores[:int(len(scores) * (percent / 100))]
-
- def setup(self):
- pass
-
- def run(self):
- for file in self.input_files:
- scores = self._scdet(file.path)
- means = sorted(self._nonoverlap_mean(scores), key=lambda x: x[1], reverse=True)
- for time, score in self._drop_lowest(means, 66):
- self.features.append(Feature(interval=Interval(start=time, duration=0.500),
- source=file, feature_extractor="videoactivity",
- score=score))
-
- def teardown(self):
- pass
-
-
-
- def teardown(self):
- pass
|