|
|
@@ -18,6 +18,8 @@ logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
class FeatureExtractor(ABC): |
|
|
|
"""Feature extractor interface.""" |
|
|
|
_CONFIG_DEFAULT_NUM_FEATURES = 5 # default number of features to keep |
|
|
|
|
|
|
|
def _run_get_output(self, cmd: list, cwd:str=".") -> str: |
|
|
|
"""Run a command and return the output as a string |
|
|
|
|
|
|
@@ -25,6 +27,40 @@ class FeatureExtractor(ABC): |
|
|
|
""" |
|
|
|
return subprocess.run(cmd, stdout=subprocess.PIPE, cwd=cwd).stdout.decode("utf-8") |
|
|
|
|
|
|
|
def _keep_num(self, features, num=_CONFIG_DEFAULT_NUM_FEATURES, trim_overlap=False) -> list: |
|
|
|
"""Keep the top n features (default: 5) |
|
|
|
|
|
|
|
Approach: |
|
|
|
- for range in 0-n |
|
|
|
+ expand the nth top feature to min duration |
|
|
|
(move start back by 0.5*min_duration, end forward by 0.5*min_duration) |
|
|
|
+ drop any features that are now in that feature's range (optional) |
|
|
|
- return the top n features |
|
|
|
|
|
|
|
Each feature is a Feature object, with an Interval object |
|
|
|
""" |
|
|
|
keep_features = [] |
|
|
|
# ensure features are sorted by score |
|
|
|
features = sorted(features, key=lambda x: x.score, reverse=True) |
|
|
|
while len(keep_features) < num and len(features) > 0: |
|
|
|
current_feature = features.pop(0) |
|
|
|
# expand the feature to min_duration - try and keep centered at current start |
|
|
|
if self._min_duration > current_feature.interval.duration: |
|
|
|
current_feature.interval.move_start(-0.5*self._min_duration, relative=True) |
|
|
|
if current_feature.interval.duration < self._min_duration: |
|
|
|
current_feature.interval.update_duration(self._min_duration) |
|
|
|
keep_features.append(current_feature) |
|
|
|
# drop any features that are now in that feature's range (plus margin) |
|
|
|
# features = [f for f in features if |
|
|
|
# (f.interval.start < current_feature.interval.start-margin and |
|
|
|
# f.interval.end > current_feature.interval.start-margin) or |
|
|
|
# (f.interval.end > current_feature.interval.end+margin and |
|
|
|
# f.interval.start < current_feature.interval.end+margin)] |
|
|
|
if trim_overlap: |
|
|
|
features = [f for f in features if not f.interval.overlaps(current_feature.interval)] |
|
|
|
|
|
|
|
return keep_features |
|
|
|
|
|
|
|
def setup(self): |
|
|
|
"""Setup the feature extractor -- validate input files & config""" |
|
|
|
|
|
|
@@ -254,40 +290,6 @@ class LoudAudioFeatureExtractor(FeatureExtractor): |
|
|
|
|
|
|
|
return loudness_features |
|
|
|
|
|
|
|
def _keep_num(self, features, num=_CONFIG_DEFAULT_NUM_FEATURES, trim_overlap=False) -> list: |
|
|
|
"""Keep the top n features (default: 5) |
|
|
|
|
|
|
|
Approach: |
|
|
|
- for range in 0-n |
|
|
|
+ expand the nth top feature to min duration |
|
|
|
(move start back by 0.5*min_duration, end forward by 0.5*min_duration) |
|
|
|
+ drop any features that are now in that feature's range (optional) |
|
|
|
- return the top n features |
|
|
|
|
|
|
|
Each feature is a Feature object, with an Interval object |
|
|
|
""" |
|
|
|
keep_features = [] |
|
|
|
# ensure features are sorted by score |
|
|
|
features = sorted(features, key=lambda x: x.score, reverse=True) |
|
|
|
for i in range(num): |
|
|
|
current_feature = features.pop(0) |
|
|
|
# expand the feature to min_duration - try and keep centered at current start |
|
|
|
if self._min_duration > current_feature.interval.duration: |
|
|
|
current_feature.interval.move_start(-0.5*self._min_duration, relative=True) |
|
|
|
if current_feature.interval.duration < self._min_duration: |
|
|
|
current_feature.interval.update_duration(self._min_duration) |
|
|
|
keep_features.append(current_feature) |
|
|
|
# drop any features that are now in that feature's range (plus margin) |
|
|
|
# features = [f for f in features if |
|
|
|
# (f.interval.start < current_feature.interval.start-margin and |
|
|
|
# f.interval.end > current_feature.interval.start-margin) or |
|
|
|
# (f.interval.end > current_feature.interval.end+margin and |
|
|
|
# f.interval.start < current_feature.interval.end+margin)] |
|
|
|
if trim_overlap: |
|
|
|
features = [f for f in features if f.interval.overlaps(current_feature.interval)] |
|
|
|
|
|
|
|
return keep_features |
|
|
|
|
|
|
|
def setup(self): |
|
|
|
"""extract audio from video files to be processed by pyloudnorm |
|
|
|
|
|
|
@@ -393,40 +395,6 @@ class VideoActivityFeatureExtractor(FeatureExtractor): |
|
|
|
scores = sorted(scores, key=lambda x: x[1], reverse=True) |
|
|
|
return scores[:int(len(scores) * (percent / 100))] |
|
|
|
|
|
|
|
def _keep_num(self, features, num=_CONFIG_DEFAULT_NUM_FEATURES, trim_overlap=False) -> list: |
|
|
|
"""Keep the top n features (default: 5) |
|
|
|
|
|
|
|
Approach: |
|
|
|
- for range in 0-n |
|
|
|
+ expand the nth top feature to min duration |
|
|
|
(move start back by 0.5*min_duration, end forward by 0.5*min_duration) |
|
|
|
+ drop any features that are now in that feature's range (optional) |
|
|
|
- return the top n features |
|
|
|
|
|
|
|
Each feature is a Feature object, with an Interval object |
|
|
|
""" |
|
|
|
keep_features = [] |
|
|
|
# ensure features are sorted by score |
|
|
|
features = sorted(features, key=lambda x: x.score, reverse=True) |
|
|
|
for i in range(num): |
|
|
|
current_feature = features.pop(0) |
|
|
|
# expand the feature to min_duration - try and keep centered at current start |
|
|
|
if self._min_duration > current_feature.interval.duration: |
|
|
|
current_feature.interval.move_start(-0.5*self._min_duration, relative=True) |
|
|
|
if current_feature.interval.duration < self._min_duration: |
|
|
|
current_feature.interval.update_duration(self._min_duration) |
|
|
|
keep_features.append(current_feature) |
|
|
|
# drop any features that are now in that feature's range (plus margin) |
|
|
|
# features = [f for f in features if |
|
|
|
# (f.interval.start < current_feature.interval.start-margin and |
|
|
|
# f.interval.end > current_feature.interval.start-margin) or |
|
|
|
# (f.interval.end > current_feature.interval.end+margin and |
|
|
|
# f.interval.start < current_feature.interval.end+margin)] |
|
|
|
if trim_overlap: |
|
|
|
features = [f for f in features if f.interval.overlaps(current_feature.interval)] |
|
|
|
|
|
|
|
return keep_features |
|
|
|
|
|
|
|
def setup(self): |
|
|
|
pass |
|
|
|
|
|
|
|