|
- from abc import ABC
- import logging
- import random
- import subprocess
- from ast import literal_eval
- from pipeline.utils import SourceMedia, Feature, Interval
-
- logger = logging.getLogger(__name__)
-
- class FeatureExtractor(ABC):
- """Feature extractor interface."""
- # TODO: #API -- decide if .features will be a member variable
- def setup(self):
- pass
-
- def run(self):
- pass
-
- def teardown(self):
- pass
-
- class LaughterFeatureExtractor(FeatureExtractor):
- """Feature extractor for laughter detection.
-
- This class is responsible for extracting features corresponding to laughter in media files.
-
- Here:
-
- setup() is used to validate input files & config, which may involve processing video files to extract audio
-
- run() is used to extract features from the audio using jrgillick's laughter-detection
-
- teardown() is used to clean up any temporary files created during setup according to the config
-
- See: https://github.com/jrgillick/laughter-detection for the laughter-detection library
- """
-
- def __init__(self, input_files=None, config=None):
- """It is expected that input_files is a SourceMedia object"""
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def _laughdetect(self, audio_file):
- """Run laughter detection on the audio file"""
- laugh_detector_dir = "/home/robert/mounts/980data/code/laughter-detection/"
- laugh_detector_script = "segment_laughter.py"
- # fake output for testing
- # laugh_detector_path = "tests/fake_segment_laughter.py"
- laugh_detector_cmd = ["python", f"{laugh_detector_dir}{laugh_detector_script}",
- f"--input_audio_file={audio_file}"]
-
- # run command, capture output, ignore exit status
- laugh_output = subprocess.run(laugh_detector_cmd,
- stdout=subprocess.PIPE,
- cwd=laugh_detector_dir).stdout.decode("utf-8")
- # ↑ have to include cwd to keep laughter-detection imports happy
- # also, it isn't happy if no output dir is specified but we get laughs so it's grand
-
- # laughs are lines in stdout that start with "instance:", followed by a space and a 2-tuple of floats
- # so jump to the 10th character and evaluate the rest of the line
- return [literal_eval(instance[10:])
- for instance in laugh_output.splitlines()
- if instance.startswith("instance: ")]
-
- def _adjust_features(self):
- """Adjust features according to config
-
- Generically, this ensures features conform to config - min/max feature length, etc.
-
- In the context of LaughterFeatureExtractor, there is some secret sauce: things that
- cause a laugh generally /precede/ the laugh, so we want more team before the detected start
- than at the end. For example, for a minimum feature length of 15s, we might prepend 10 seconds,
- and append 5 seconds (for example), or 12s and 3s. We may wish to do this pre/post adjustment
- for all laughter features found, regardless of length.
-
- TODO: figure out how we're going to handle length adjustments
- TODO: config for length adjustments per design doc
- TODO: play with numbers more to see what works best
- """
- PREPEND = 7.0
- APPEND = 3.0
-
- for feature in self.features:
- # do the pre & post adjustment
- feature.interval.move_start(-PREPEND, relative=True)
- feature.interval.move_end(APPEND, relative=True)
-
- def setup(self):
- """Setup the laughter feature extractor -- validate input files & config
-
- jrgillick's laughter-detection library can work with AV files directly
-
- TODO: validate input files
- TODO: handle config
- """
- logger.debug("LaughterFeatureExtractor setup")
-
- # Validate input files
- if not self.input_files:
- raise ValueError("No input files provided")
-
- # TODO: convert video to audio if needed
-
- def run(self):
- """Extract laughter features for each input file"""
- if self.input_files:
- for file in self.input_files:
- laughs = self._laughdetect(file.path)
- for laugh in laughs:
- start, end = laugh
- self.features.append(Feature(interval=Interval(start=start, end=end),
- source="laughter", path=file.path))
- # TODO: implement options eg minimum feature length
-
- # adjust features
- self._adjust_features()
-
- def teardown(self):
- pass
-
- class RandomFeatureExtractor(FeatureExtractor):
- """Feature extractor for random feature generation.
-
- This class is responsible for generating random features for testing purposes.
-
- Here:
-
- setup() is used to validate input files & config
-
- run() is used to generate random features
-
- teardown() is used to clean up any temporary files created during setup according to the config
- """
- NUM_FEATURES = 5
- MAX_DURATION = 20.0
-
- def __init__(self, input_files=None, config=None):
- """It is expected that input_files is a SourceMedia object"""
- self.input_files = input_files
- self.config = config
- self.features = []
-
- def setup(self):
- """Setup the random feature extractor -- validate input files & config"""
- logger.debug("RandomFeatureExtractor setup")
-
- # Validate input files
- if not self.input_files:
- raise ValueError("No input files provided")
-
- def run(self):
- """Generate random features for each input file"""
- # check self.input_files is of type SourceMedia
- if not self.input_files or not isinstance(self.input_files, SourceMedia):
- raise ValueError("No input files provided")
-
- for file in self.input_files:
- for _ in range(self.NUM_FEATURES):
- # round to 3 decimal places
- duration = random.random() * self.MAX_DURATION
- start = random.random() * file.duration() - duration
- self.features.append(Feature(interval=Interval(start=start, duration=duration),
- source="random", path=file.path))
-
- def teardown(self):
- pass
|