@@ -0,0 +1,156 @@ | |||||
"""adjusters.py -- adjust the gathered Features | |||||
This is usually done to either modify to reduce the Features in some way. | |||||
For example: | |||||
- TargetTimeAdjuster: drop Features until the target time is reached | |||||
- FeatureCountAdjuster: drop Features until the target number of Features is reached | |||||
TODO: Consider eg a generic PredicateAdjuster -- supply a predicate/lambda that will be used to determine whether to keep a Feature or not. | |||||
""" | |||||
from enum import Enum | |||||
class Adjuster(): | |||||
"""Generic Adjuster class. Expects a list of Features and returns a list of Features.""" | |||||
def __init__(self, features: list=[]): | |||||
"""Initialize the Adjuster with Features. | |||||
NOTE: an empty feature list is permitted, since a FeatureExtractor may not produce features. Adjusters subclassing should be aware of this. | |||||
""" | |||||
self.features = features | |||||
def adjust(self) -> list: | |||||
"""Adjust the Features. Override this method in subclasses.""" | |||||
return self.features | |||||
class TargetTimeAdjuster(Adjuster): | |||||
"""Adjuster that drops Features until the target time is reached.""" | |||||
_STRATEGY = Enum("MarginStrategy", ["ABSOLUTE", "PERCENT"]) | |||||
_DEFAULT_TARGET_TIME = 60.0 # 1 minute | |||||
_DEFAULT_MARGIN = 10 # can be percent or absolute value | |||||
def _determine_margin(self, time: float, margin: float, strategy: _STRATEGY) -> tuple: | |||||
"""Determine the target time margins. | |||||
If the strategy is ABSOLUTE, the margin is a fixed value in seconds. | |||||
If the strategy is PERCENT, the margin is a percentage of the target time. | |||||
Returns a tuple of (min, max) times. | |||||
Pulled out for unit testing | |||||
""" | |||||
target_time_min = target_time_max = None | |||||
if strategy == self._STRATEGY.ABSOLUTE: | |||||
# both specified in seconds | |||||
target_time_min = time - margin | |||||
target_time_max = time + margin | |||||
elif strategy == self._STRATEGY.PERCENT: | |||||
target_time_max = time + (time * margin / 100) | |||||
target_time_min = time - (time * margin / 100) | |||||
# ensure we don't have negative times | |||||
if type(target_time_min) is float and target_time_min < 0: | |||||
target_time_min = 0.0 | |||||
return (target_time_min, target_time_max) | |||||
def _features_total_time(self, features: list) -> float: | |||||
"""Calculate the total duration of all Features. | |||||
Returns the total time in seconds. | |||||
Pulled out for unit testing. | |||||
""" | |||||
return float(sum([x.interval.duration for x in features])) | |||||
def _sort_by_score_time(self, features: list) -> list: | |||||
"""Sort Features by score (primary) and by time (secondary). | |||||
Returns a sorted list of Features. | |||||
Pulled out for unit testing as RDH was having issues with adjust() | |||||
and wanted to verify sorting was working correctly. | |||||
""" | |||||
return sorted(features, key=lambda x: (x.score, x.interval.duration)) | |||||
def __init__(self, features: list=[], | |||||
target_time: int|float=_DEFAULT_TARGET_TIME, | |||||
margin: int|float=_DEFAULT_MARGIN, | |||||
strategy=_STRATEGY.ABSOLUTE): | |||||
"""Initialize the Adjuster with Features and a target time. | |||||
Default target time is 60 seconds (1 minute). Even if the desired target time is 60s exactly, it is recommended to specify it explicitly. | |||||
""" | |||||
super().__init__(features) | |||||
self.target_time = float(target_time) | |||||
self.margin = float(margin) | |||||
self.strategy = strategy | |||||
def adjust(self) -> list: | |||||
"""Drop Features until the target time within the margin is reached. Prioritise dropping lower scoring Features. | |||||
Approach: | |||||
Sort list of Features by score (primary) and by time (secondary). | |||||
Drop lowest scoring Features until the target time is reached; | |||||
if dropping a Feature would result in missing the margin, skip dropping that Feature | |||||
if no Features can be dropped without missing the margin, | |||||
drop the lowest scoring Feature until we are under the target time (with margin) | |||||
Returns a list of Features, and also modifies the internal list of Features. | |||||
""" | |||||
# check for early exit | |||||
if not self.features: | |||||
return [] | |||||
# figure out our margins | |||||
target_time_min, target_time_max = self._determine_margin(self.target_time, self.margin, self.strategy) | |||||
# calculate total time of all Features | |||||
total_time = self._features_total_time(features=self.features) | |||||
# if we are already within the target time, return the Features as-is | |||||
if total_time <= target_time_max: | |||||
return self.features | |||||
# sort list of Features by score (primary) and by duration (secondary) | |||||
sorted_features = self._sort_by_score_time(self.features) | |||||
drop_indices = [] # indices of Features to drop | |||||
# first pass- drop lowest scoring Features until we are within the target time | |||||
for i in range(len(sorted_features)): | |||||
# check if dropping this Feature would put us in the target range: | |||||
# if so, drop it and return | |||||
if (total_time - sorted_features[i].interval.duration >= target_time_min and | |||||
total_time - sorted_features[i].interval.duration <= target_time_max): | |||||
drop_indices.append(i) | |||||
break | |||||
elif (total_time - sorted_features[i].interval.duration > target_time_max): | |||||
drop_indices.append(i) | |||||
total_time -= sorted_features[i].interval.duration | |||||
for i in drop_indices: | |||||
self.features.remove(sorted_features[i]) | |||||
# if we are now within the target time, return the Features | |||||
total_time = self._features_total_time(features=self.features) | |||||
if total_time <= target_time_max: | |||||
return self.features | |||||
# else: we are still over the target time | |||||
# so drop the lowest scoring Features until we are UNDER the target time | |||||
for i in range(len(sorted_features)): | |||||
self.features.remove(sorted_features[i]) | |||||
total_time -= sorted_features[i].interval.duration | |||||
if total_time <= target_time_max: | |||||
break | |||||
return self.features |
@@ -199,14 +199,18 @@ class LoudAudioFeatureExtractor(FeatureExtractor): | |||||
teardown() is used to clean up temporary files created during setup (if specified by config) | teardown() is used to clean up temporary files created during setup (if specified by config) | ||||
""" | """ | ||||
_CONFIG_DEFAULT_NUM_FEATURES = 5 # keep the top 5 loudnesses | |||||
def __init__(self, input_files=None, config=None, num_features=_CONFIG_DEFAULT_NUM_FEATURES): | |||||
_CONFIG_DEFAULT_NUM_FEATURES = 15 # keep the top 5 loudnesses | |||||
_CONFIG_DEFAULT_MIN_DURATION = 5.00 # seconds | |||||
def __init__(self, input_files=None, config=None, | |||||
num_features=_CONFIG_DEFAULT_NUM_FEATURES, | |||||
min_duration=_CONFIG_DEFAULT_MIN_DURATION): | |||||
if not input_files: | if not input_files: | ||||
raise ValueError("No input files provided!") | raise ValueError("No input files provided!") | ||||
self.input_files = input_files | self.input_files = input_files | ||||
self.config = config | self.config = config | ||||
self.features = [] | self.features = [] | ||||
self._num_features = num_features | self._num_features = num_features | ||||
self._min_duration = min_duration | |||||
def _audio_file_from_path(self, path: str) -> str: | def _audio_file_from_path(self, path: str) -> str: | ||||
"""Return the audio file path given a video file path | """Return the audio file path given a video file path | ||||
@@ -250,9 +254,33 @@ class LoudAudioFeatureExtractor(FeatureExtractor): | |||||
return loudness_features | return loudness_features | ||||
def _keep_num(self, loudnesses, num=_CONFIG_DEFAULT_NUM_FEATURES) -> list: | |||||
"""Keep the top n loudnesses (default: 5)""" | |||||
return sorted(loudnesses, key=lambda x: x[1], reverse=True)[:num] | |||||
def _keep_num(self, features, num=_CONFIG_DEFAULT_NUM_FEATURES, margin=10.0) -> list: | |||||
"""Keep the top n features (default: 5) | |||||
Approach: | |||||
- for range in 0-n | |||||
+ expand the nth top feature to min duration | |||||
(move start back by 0.5*min_duration, end forward by 0.5*min_duration) | |||||
+ drop any features that are now in that feature's range (plus margin) | |||||
- return the top n features | |||||
Each feature is a Feature object, with an Interval object | |||||
""" | |||||
keep_features = [] | |||||
# ensure features are sorted by score | |||||
features = sorted(features, key=lambda x: x.score, reverse=True) | |||||
for i in range(num): | |||||
current_feature = features.pop(0) | |||||
# expand the feature to min_duration | |||||
current_feature.interval.move_start(-0.5*self._min_duration, relative=True) | |||||
current_feature.interval.move_end(0.5*self._min_duration, relative=True) | |||||
keep_features.append(current_feature) | |||||
# drop any features that are now in that feature's range (plus margin) | |||||
features = [f for f in features if | |||||
(f.interval.end < current_feature.interval.start-margin or | |||||
f.interval.start > current_feature.interval.end+margin)] | |||||
return keep_features | |||||
def setup(self): | def setup(self): | ||||
"""extract audio from video files to be processed by pyloudnorm | """extract audio from video files to be processed by pyloudnorm | ||||
@@ -271,11 +299,14 @@ class LoudAudioFeatureExtractor(FeatureExtractor): | |||||
for file in self.input_files: | for file in self.input_files: | ||||
audio_file = self._audio_file_from_path(file.path) | audio_file = self._audio_file_from_path(file.path) | ||||
loudnesses = self._loudnorm(audio_file) | loudnesses = self._loudnorm(audio_file) | ||||
top_loudnesses = self._keep_num(loudnesses, self._num_features) | |||||
for time, loudness in top_loudnesses: | |||||
self.features.append(Feature(interval=Interval(start=time, duration=0.500), | |||||
features = [] | |||||
for time, loudness in loudnesses: | |||||
features.append(Feature(interval=Interval(start=time, duration=0.500), | |||||
source=file, feature_extractor="loudness", | source=file, feature_extractor="loudness", | ||||
score=loudness)) | score=loudness)) | ||||
# prune features list to keep self.num_features | |||||
self.features = self._keep_num(features, self._num_features) | |||||
class VideoActivityFeatureExtractor(FeatureExtractor): | class VideoActivityFeatureExtractor(FeatureExtractor): | ||||
@@ -295,12 +326,18 @@ class VideoActivityFeatureExtractor(FeatureExtractor): | |||||
#TODO: minimum duration -- consider whether to do here, or expand duration post-consolidation | #TODO: minimum duration -- consider whether to do here, or expand duration post-consolidation | ||||
""" | """ | ||||
def __init__(self, input_files=None, config=None): | |||||
_CONFIG_DEFAULT_NUM_FEATURES = 15 # keep the top 5 activity moments | |||||
_CONFIG_DEFAULT_MIN_DURATION = 5.00 # seconds | |||||
def __init__(self, input_files=None, config=None, | |||||
num_features=_CONFIG_DEFAULT_NUM_FEATURES, | |||||
min_duration=_CONFIG_DEFAULT_MIN_DURATION): | |||||
if not input_files: | if not input_files: | ||||
raise ValueError("No input files provided!") | raise ValueError("No input files provided!") | ||||
self.input_files = input_files | self.input_files = input_files | ||||
self.config = config | self.config = config | ||||
self.features = [] | self.features = [] | ||||
self._num_features = num_features | |||||
self._min_duration = min_duration | |||||
def _scdet(self, video_file): | def _scdet(self, video_file): | ||||
"""Run scdet filter on the video file""" | """Run scdet filter on the video file""" | ||||
@@ -346,6 +383,35 @@ class VideoActivityFeatureExtractor(FeatureExtractor): | |||||
scores = sorted(scores, key=lambda x: x[1], reverse=True) | scores = sorted(scores, key=lambda x: x[1], reverse=True) | ||||
return scores[:int(len(scores) * (percent / 100))] | return scores[:int(len(scores) * (percent / 100))] | ||||
def _keep_num(self, features, num=_CONFIG_DEFAULT_NUM_FEATURES, margin=10.0) -> list: | |||||
"""Keep the top n features (default: 5) | |||||
Approach: | |||||
- for range in 0-n | |||||
+ expand the nth top feature to min duration | |||||
(move start back by 0.5*min_duration, end forward by 0.5*min_duration) | |||||
+ drop any features that are now in that feature's range (plus margin) | |||||
- return the top n features | |||||
Each feature is a Feature object, with an Interval object | |||||
""" | |||||
keep_features = [] | |||||
# ensure features are sorted by score | |||||
features = sorted(features, key=lambda x: x.score, reverse=True) | |||||
for i in range(num): | |||||
current_feature = features.pop(0) | |||||
# expand the feature to min_duration | |||||
current_feature.interval.move_start(-0.5*self._min_duration, relative=True) | |||||
current_feature.interval.move_end(0.5*self._min_duration, relative=True) | |||||
keep_features.append(current_feature) | |||||
# drop any features that are now in that feature's range (plus margin) | |||||
features = [f for f in features if | |||||
(f.interval.end < current_feature.interval.start-margin or | |||||
f.interval.start > current_feature.interval.end+margin)] | |||||
return keep_features | |||||
def setup(self): | def setup(self): | ||||
pass | pass | ||||
@@ -353,11 +419,16 @@ class VideoActivityFeatureExtractor(FeatureExtractor): | |||||
for file in self.input_files: | for file in self.input_files: | ||||
scores = self._scdet(file.path) | scores = self._scdet(file.path) | ||||
means = sorted(self._nonoverlap_mean(scores), key=lambda x: x[1], reverse=True) | means = sorted(self._nonoverlap_mean(scores), key=lambda x: x[1], reverse=True) | ||||
features = [] | |||||
for time, score in self._drop_lowest(means, 66): | for time, score in self._drop_lowest(means, 66): | ||||
self.features.append(Feature(interval=Interval(start=time, duration=0.500), | |||||
features.append(Feature(interval=Interval(start=time, duration=0.500), | |||||
source=file, feature_extractor="videoactivity", | source=file, feature_extractor="videoactivity", | ||||
score=score)) | score=score)) | ||||
# prune features list to keep self.num_features | |||||
self.features = self._keep_num(features, self._num_features) | |||||
def teardown(self): | def teardown(self): | ||||
pass | pass | ||||
@@ -7,9 +7,16 @@ class MockInterval(): | |||||
self.end = end | self.end = end | ||||
self.duration = end - start | self.duration = end - start | ||||
@classmethod | |||||
def from_duration(cls, duration): | |||||
return cls(start=0, end=duration) | |||||
def to_json(self): | def to_json(self): | ||||
return {"start": self.start, "end": self.end} | return {"start": self.start, "end": self.end} | ||||
def __eq__(self, other): | |||||
return self.start == other.start and self.end == other.end | |||||
class MockFeature(): | class MockFeature(): | ||||
"""Mock feature object for testing""" | """Mock feature object for testing""" | ||||
def __init__(self, interval, source=None, feature_extractor="mock", score=0.0): | def __init__(self, interval, source=None, feature_extractor="mock", score=0.0): | ||||
@@ -21,6 +28,10 @@ class MockFeature(): | |||||
def to_json(self): | def to_json(self): | ||||
return {"interval": self.interval} | return {"interval": self.interval} | ||||
def __eq__(self, other): | |||||
return (self.interval == other.interval and self.source == other.source | |||||
and self.feature_extractor == other.feature_extractor) | |||||
class MockSource(): | class MockSource(): | ||||
"""Mock Source object for testing Feature""" | """Mock Source object for testing Feature""" | ||||
def __init__(self, source=None, path=None): | def __init__(self, source=None, path=None): | ||||
@@ -0,0 +1,282 @@ | |||||
"""test_adjusters.py -- test pipeline Adjusters (eg TargetTimeAdjuster)""" | |||||
import unittest | |||||
import unittest.mock as mock | |||||
import pipeline.adjusters as adjusters | |||||
from test.mocks import MockFeature, MockInterval | |||||
class TestAdjuster(unittest.TestCase): | |||||
"""Test the generic Adjuster class""" | |||||
def test_init(self): | |||||
"""Test the Adjuster can be initialised""" | |||||
adjuster = adjusters.Adjuster() | |||||
self.assertEqual(adjuster.features, []) | |||||
def test_adjust(self): | |||||
"""Test the generic adjust""" | |||||
adjuster = adjusters.Adjuster() | |||||
self.assertEqual(adjuster.adjust(), []) | |||||
self.assertEqual(adjuster.features, []) | |||||
class TestTargetTimeAdjuster(unittest.TestCase): | |||||
"""Test the TargetTimeAdjuster | |||||
TTA drops Features until the target time is reached (or within a margin)""" | |||||
def test_init(self): | |||||
"""Test the TTA can be initialised""" | |||||
tta = adjusters.TargetTimeAdjuster() | |||||
self.assertEqual(tta.features, []) | |||||
def test_features_total_time(self): | |||||
"""Test the TTA can calculate the total time of Features | |||||
Test: | |||||
- input duration floats: 1.0, 2.0, 3.0, 4.0 == 10.0 | |||||
""" | |||||
tta = adjusters.TargetTimeAdjuster() | |||||
features = [] | |||||
for i in range(1, 5): | |||||
features.append(make_feature(duration=i*1.0)) | |||||
self.assertEqual(tta._features_total_time(features), 10.0) | |||||
self.assertEqual(tta._features_total_time([]), 0.0) | |||||
self.assertIs(type(tta._features_total_time([])), float) | |||||
def test_determine_margin(self): | |||||
"""Test the TTA can determine the target time margins | |||||
Args: time, margin, strategy (strategy in: ABSOLUTE, PERCENT) | |||||
Test: | |||||
- margin of zero | |||||
- margin of 5.0 | |||||
- margin of 10.0 | |||||
- margin of 100.0 | |||||
- both ABSOLUTE and PERCENT strategies | |||||
TODO: figure out what should be done with negative margins & margins > 100.0 | |||||
""" | |||||
tta = adjusters.TargetTimeAdjuster() | |||||
with self.subTest("ABSOLUTE"): | |||||
strategy = adjusters.TargetTimeAdjuster._STRATEGY.ABSOLUTE | |||||
test_cases = [] | |||||
# populate test cases with tuples of (time, margin, expected) | |||||
# zero margin | |||||
test_cases.append((60.0, 0.0, (60.0, 60.0))) | |||||
# margin of 5.0 | |||||
test_cases.append((60.0, 5.0, (55.0, 65.0))) | |||||
# margin of 10.0 | |||||
test_cases.append((60.0, 10.0, (50.0, 70.0))) | |||||
# margin of 100.0 | |||||
test_cases.append((60.0, 100.0, (0.0, 160.0))) | |||||
# test | |||||
for time, margin, expected in test_cases: | |||||
self.assertEqual(tta._determine_margin(time, margin, strategy), expected) | |||||
with self.subTest("PERCENT"): | |||||
strategy = adjusters.TargetTimeAdjuster._STRATEGY.PERCENT | |||||
test_cases = [] | |||||
# populate test cases with tuples of (time, margin, expected) as above | |||||
# zero margin | |||||
test_cases.append((60.0, 0.0, (60.0, 60.0))) | |||||
# margin of 5.0 | |||||
test_cases.append((60.0, 5.0, (57.0, 63.0))) | |||||
# margin of 10.0 | |||||
test_cases.append((60.0, 10.0, (54.0, 66.0))) | |||||
# margin of 100.0 | |||||
test_cases.append((60.0, 100.0, (0.0, 120.0))) | |||||
# test | |||||
for time, margin, expected in test_cases: | |||||
self.assertEqual(tta._determine_margin(time, margin, strategy), expected) | |||||
def test_adjust_no_change(self): | |||||
"""Test adjusting of list of Features using TTA -- no change to list of Features | |||||
Cases: | |||||
- no Features --> [] | |||||
- [Features] with total time < target time --> unchanged list | |||||
- [Features] with total time = target time --> unchanged list | |||||
TODO: test with Features > target | |||||
""" | |||||
with self.subTest("no Features"): | |||||
tta = adjusters.TargetTimeAdjuster() | |||||
self.assertEqual(tta.adjust(), []) | |||||
with self.subTest("Features < target time"): | |||||
features = [] | |||||
for i in range(1, 5): | |||||
features.append(make_feature(duration=i*1.0)) | |||||
tta = adjusters.TargetTimeAdjuster(features=features, target_time=20.0) | |||||
self.assertEqual(tta.adjust(), features) | |||||
with self.subTest("Features = target time"): | |||||
features = [] | |||||
for i in range(1, 5): | |||||
features.append(make_feature(duration=i*1.0)) | |||||
tta = adjusters.TargetTimeAdjuster(features=features, target_time=10.0) | |||||
self.assertEqual(tta.adjust(), features) | |||||
def test_sort_by_score_time(self): | |||||
"""Test sorting of list of Features by score (primary) and time (secondary) | |||||
Cases: | |||||
- [(15.0, 1.0), (10.0, 1.0), (12.0, 1.0)] --> [(10.0, 1.0), (12.0, 1.0), (15.0, 1.0)] # score equal, sort by time | |||||
- [(15.0, 1.0), (10.0, 4.0), (12.0, 3.0)] --> [(15.0, 1.0), (12.0, 3.0), (10.0, 4.0)] # sort by score | |||||
- [(15.0, 1.0), (10.0, 1.0), (12.0, 2.0)] --> [(10.0, 1.0), (15.0, 1.0), (12.0, 2.0)] # mixed: scores below duration | |||||
- [] --> [] | |||||
- [(15.0, 1.0)] --> [(15.0, 1.0)] | |||||
Cases giving RDH trouble: | |||||
- [(16.0, 1.0), (16.0, 1.0), (1.0, 1.0), (1.0, 1.0)] --> [(1.0, 1.0), (1.0, 1.0), (16.0, 1.0), (16.0, 1.0)] # multiple lowest scoring, multiple shortest duration | |||||
""" | |||||
tta = adjusters.TargetTimeAdjuster() | |||||
with self.subTest("score equal, sort by duration"): | |||||
features = [ | |||||
make_feature(duration=15.0, score=1.0), | |||||
make_feature(duration=10.0, score=1.0), | |||||
make_feature(duration=12.0, score=1.0) | |||||
] | |||||
self.assertEqual(tta._sort_by_score_time(features), [features[1], features[2], features[0]]) | |||||
with self.subTest("sort by score, duration irrelevant"): | |||||
features = [ | |||||
make_feature(duration=15.0, score=1.0), | |||||
make_feature(duration=10.0, score=4.0), | |||||
make_feature(duration=12.0, score=3.0) | |||||
] | |||||
self.assertEqual(tta._sort_by_score_time(features), [features[0], features[2], features[1]]) | |||||
with self.subTest("mixed: scores below duration"): | |||||
features = [ | |||||
make_feature(duration=15.0, score=1.0), | |||||
make_feature(duration=10.0, score=1.0), | |||||
make_feature(duration=12.0, score=2.0) | |||||
] | |||||
self.assertEqual(tta._sort_by_score_time(features), [features[1], features[0], features[2]]) | |||||
with self.subTest("empty"): | |||||
self.assertEqual(tta._sort_by_score_time([]), []) | |||||
with self.subTest("single"): | |||||
features = [mock.Mock(duration=15.0, score=1.0)] | |||||
self.assertEqual(tta._sort_by_score_time(features), features) | |||||
with self.subTest("multiple lowest scoring, multiple shortest duration"): | |||||
features = [ | |||||
make_feature(duration=16.0, score=1.0), | |||||
make_feature(duration=16.0, score=1.0), | |||||
make_feature(duration=1.0, score=1.0), | |||||
make_feature(duration=1.0, score=1.0) | |||||
] | |||||
self.assertEqual(tta._sort_by_score_time(features), [features[2], features[3], features[0], features[1]]) | |||||
def test_adjust_changes(self): | |||||
"""Test adjusting of list of Features using TTA -- changes to list of Features | |||||
All cases have total time > target time. | |||||
In the cases, specification is Feature(duration, score) | |||||
Cases: | |||||
- target = 30.0, margin = 0.0 | |||||
+ [(15.0, 1.0), (10.0, 1.0), (12.0, 1.0)] --> [(15.0, 1.0), (12.0, 1.0)] # scores equal, drop smallest | |||||
+ [(15.0, 2.0), (10.0, 2.0), (12.0, 1.0)] --> [(15.0, 1.0), (10.0, 1.0)] # drop lowest scoring (1) | |||||
+ [(15.0, 1.0), (10.0, 1.0), (12.0, 2.0)] --> [(15.0, 1.0), (12.0, 2.0)] # drop lowest scoring (2) | |||||
- target = 30.0, margin = 4.0 | |||||
+ [(15.0, 1.0), (10.0, 2.0), (12.0, 1.0)] --> [(15.0, 1.0), (12.0, 1.0)] # not lowest scoring, but within margin | |||||
+ [(16.0, 1.0), (16.0, 1.0), (1.0, 1.0), (1.0, 1.0)] --> [(16.0, 1.0), (16.0, 1.0)] # drop multiple lowest scoring, shortest duration | |||||
""" | |||||
# target 30.0, margin 0.0 cases | |||||
target, margin = 30.0, 0.0 | |||||
with self.subTest(f"target {target} margin {margin}"): | |||||
with self.subTest("scores equal"): | |||||
features = [ | |||||
make_feature(duration=15.0, score=1.0), | |||||
make_feature(duration=10.0, score=1.0), | |||||
make_feature(duration=12.0, score=1.0) | |||||
] | |||||
tta = adjusters.TargetTimeAdjuster(features=features, target_time=target, margin=margin) | |||||
expected = [features[0], features[2]] | |||||
output = tta.adjust() | |||||
self.assertEqual(len(output), 2) | |||||
self.assertEqual(output, expected) | |||||
self.assertEqual(tta.features, expected) | |||||
with self.subTest("drop lowest scoring (1)"): | |||||
features = [ | |||||
make_feature(duration=15.0, score=2.0), | |||||
make_feature(duration=10.0, score=2.0), | |||||
make_feature(duration=12.0, score=1.0) | |||||
] | |||||
tta = adjusters.TargetTimeAdjuster(features=features, target_time=target, margin=margin) | |||||
expected = [features[0], features[1]] | |||||
output = tta.adjust() | |||||
self.assertEqual(len(output), 2) | |||||
self.assertEqual(output, expected) | |||||
self.assertEqual(tta.features, expected) | |||||
with self.subTest("drop lowest scoring (2)"): | |||||
features = [ | |||||
make_feature(duration=15.0, score=1.0), | |||||
make_feature(duration=10.0, score=1.0), | |||||
make_feature(duration=12.0, score=2.0) | |||||
] | |||||
tta = adjusters.TargetTimeAdjuster(features=features, target_time=target, margin=margin) | |||||
expected = [features[0], features[2]] | |||||
output = tta.adjust() | |||||
self.assertEqual(len(output), 2) | |||||
self.assertEqual(output, expected) | |||||
self.assertEqual(tta.features, expected) | |||||
# target 30.0, margin 4.0 cases | |||||
target, margin, strategy = 30.0, 4.0, adjusters.TargetTimeAdjuster._STRATEGY.ABSOLUTE | |||||
with self.subTest(f"target {target} margin {margin}"): | |||||
with self.subTest("not lowest scoring, but within margin"): | |||||
# explanation: dropping the 10.0 feature would put us at 27.0, which is within the margin (26.0, 34.0) | |||||
features = [ | |||||
make_feature(duration=15.0, score=1.0), | |||||
make_feature(duration=10.0, score=2.0), | |||||
make_feature(duration=12.0, score=1.0) | |||||
] | |||||
tta = adjusters.TargetTimeAdjuster(features=features, target_time=target, | |||||
margin=margin, strategy=strategy) | |||||
expected = [features[0], features[2]] | |||||
output = tta.adjust() | |||||
self.assertEqual(len(output), 2) | |||||
self.assertEqual(output, expected) | |||||
self.assertEqual(tta.features, expected) | |||||
with self.subTest("drop multiple lowest scoring, shortest duration"): | |||||
# explanation: dropping the 1.0 features would put us at 32.0, which is within the margin (26.0, 34.0) | |||||
features = [ | |||||
make_feature(duration=16.0, score=1.0), | |||||
make_feature(duration=16.0, score=1.0), | |||||
make_feature(duration=1.0, score=1.0), | |||||
make_feature(duration=1.0, score=1.0), | |||||
make_feature(duration=1.0, score=1.0), | |||||
make_feature(duration=1.0, score=1.0) | |||||
] | |||||
tta = adjusters.TargetTimeAdjuster(features=features, target_time=target, | |||||
margin=margin, strategy=strategy) | |||||
expected = [features[0], features[1], features[2], features[3]] | |||||
output = tta.adjust() | |||||
self.assertEqual(len(output), 4) | |||||
self.assertEqual(output, expected) | |||||
self.assertEqual(tta.features, expected) | |||||
def make_feature(duration, score=1.0): | |||||
"""Helper function to create a MockFeature from duration and score""" | |||||
return MockFeature(interval=MockInterval.from_duration(duration), score=score) |