|
|
@@ -1,9 +1,13 @@ |
|
|
|
from abc import ABC |
|
|
|
import logging |
|
|
|
import os |
|
|
|
import random |
|
|
|
import subprocess |
|
|
|
from ast import literal_eval |
|
|
|
from pipeline.utils import SourceMedia, Feature, Interval |
|
|
|
# for loudness detection |
|
|
|
import soundfile |
|
|
|
import pyloudnorm |
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
@@ -175,6 +179,68 @@ class LoudAudioFeatureExtractor(FeatureExtractor): |
|
|
|
|
|
|
|
teardown() is used to clean up temporary files created during setup (if specified by config) |
|
|
|
""" |
|
|
|
def __init__(self, input_files=None, config=None): |
|
|
|
if not input_files: |
|
|
|
raise ValueError("No input files provided!") |
|
|
|
self.input_files = input_files |
|
|
|
self.config = config |
|
|
|
self.features = [] |
|
|
|
|
|
|
|
def _audio_file_from_path(self, path: str) -> str: |
|
|
|
"""Return the audio file path given a video file path |
|
|
|
|
|
|
|
Example: |
|
|
|
- in = "/path/to/video.mp4" |
|
|
|
- out = "/tmp/video.mp4.wav" |
|
|
|
""" |
|
|
|
OUTPUT_DIR = "/tmp" |
|
|
|
return f"{OUTPUT_DIR}/{os.path.basename(path)}.wav" |
|
|
|
|
|
|
|
def _loudnorm(self, audio_file): |
|
|
|
"""Run pyloudnorm on the audio file""" |
|
|
|
data, rate = soundfile.read(audio_file) # load audio (with shape (samples, channels)) |
|
|
|
meter = pyloudnorm.Meter(rate=rate,block_size=0.3) # create BS.1770 meter |
|
|
|
|
|
|
|
loudnesses = [] |
|
|
|
loudness_features = [] |
|
|
|
window_size = int(rate * 0.5) # 500ms |
|
|
|
stride_size = int(rate * 0.5) # 500ms -- no overlap |
|
|
|
|
|
|
|
# for w in range(data.shape[0]//100): |
|
|
|
# loudnesses.append(meter.integrated_loudness(data[w:w+int(0.3*rate),0:2])) |
|
|
|
for w in range(0, len(data)-window_size, stride_size): |
|
|
|
window = data[w:w+window_size, 0:2] # extract window |
|
|
|
loudnesses.append( (w/rate, meter.integrated_loudness(window)) ) |
|
|
|
|
|
|
|
for timecode, loudval in sorted([l for l in loudnesses if float(l[1]) != float("-inf")], key=lambda x: x[1], reverse=True): |
|
|
|
# print(f"Timecode: {timecode}, Loudness: {loudval}") |
|
|
|
loudness_features.append((timecode, round(loudval, 3))) # round to 3 DP |
|
|
|
|
|
|
|
return loudness_features |
|
|
|
|
|
|
|
def setup(self): |
|
|
|
"""extract audio from video files to be processed by pyloudnorm |
|
|
|
|
|
|
|
TODO: config -- hardcoded for now |
|
|
|
""" |
|
|
|
# pyloudnorm expects WAV files |
|
|
|
for file in self.input_files: |
|
|
|
audio_file = self._audio_file_from_path(file.path) |
|
|
|
# ffmpeg -i input.mp4 -vn -acodec pcm_s16le output.wav |
|
|
|
subprocess.run(["ffmpeg", "-y", "-i", file.path, "-vn", "-acodec", "pcm_s16le", audio_file], |
|
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
|
|
|
|
def run(self): |
|
|
|
"""Use pyloudnorm to detect loud audio""" |
|
|
|
for file in self.input_files: |
|
|
|
audio_file = self._audio_file_from_path(file.path) |
|
|
|
loudnesses = self._loudnorm(audio_file) |
|
|
|
for time, loudness in loudnesses: |
|
|
|
self.features.append(Feature(interval=Interval(start=time, duration=0.500), |
|
|
|
source=file, feature_extractor="loudness", |
|
|
|
score=loudness)) |
|
|
|
|
|
|
|
|
|
|
|
class VideoActivityFeatureExtractor(FeatureExtractor): |
|
|
|
"""Feature extractor for video activity detection. |
|
|
|
|
|
|
|