  1. """Classes for producing videos"""
  2. from abc import ABC
  3. import json
  4. import logging
  5. import os
  6. import subprocess
  7. import tempfile
  8. # for visualisations:
  9. import matplotlib.pyplot as plt
  10. # for encoding as JSON
  11. from pipeline.utils import Feature
  12. class Producer(ABC):
  13. """Generic producer interface."""
  14. def __init__(self, features):
  15. """All producers should take a list of features as input"""
  16. def produce(self):
  17. """All Producers should produce something!"""
  18. class VideoProducer(Producer):
  19. """Video producer interface."""
  20. class FfmpegVideoProducer(VideoProducer):
  21. """Produce videos using ffmpeg"""
  22. # TODO: consider output filename options
  24. _CONFIG_DEFAULT_OUTPUT_FILENAME = "highlights.mp4"
  26. def __init__(self, features, compile_clips=_CONFIG_COMPILE_CLIPS,
  27. output_dir=_CONFIG_DEFAULT_OUTPUT_DIR,
  28. output_filename=_CONFIG_DEFAULT_OUTPUT_FILENAME,
  29. ) -> None:
  30. if not features:
  31. raise ValueError("No features provided")
  32. # TODO: consider if we want to permit empty features (producing no video)
  33. self.features = features
  34. self._compile_clips = compile_clips
  35. self._output_dir = output_dir
  36. self._output_filename = output_filename
  37. def _run_no_output(self, cmd: list, cwd:str=".") -> None:
  38. """Run a command and return the output as a string
  39. Defined to be mocked out in tests via unittest.mock.patch
  40. """
  41., stdout=None, stderr=None, cwd=cwd)
  42. def _ffmpeg_feature_to_clip(self, feature=None, output_filepath=None):
  43. """use ffmpeg to produve a video clip from a feature"""
  44. OVERWRITE = True # TODO: consider making this a config option
  45. if not feature or not feature.interval:
  46. raise ValueError("No feature provided")
  47. if not output_filepath:
  48. raise ValueError("No output filepath provided")
  49. ffmpeg_prefix = ["ffmpeg", "-y"] if OVERWRITE else ["ffmpeg"]
  50. ffmpeg_suffix = ["-r", "60", "-c:v", "libx264", "-crf", "26", "-c:a", "aac", "-preset", "ultrafast"]
  51. # TODO: match framerate of input video
  52. # TODO: adjustable encoding options
  53. seek = ["-ss", str(feature.interval.start)]
  54. duration = ["-t", str(feature.interval.duration)]
  55. ffmpeg_args = ffmpeg_prefix + seek + ["-i"] + [feature.source.path] +\
  56. duration + ffmpeg_suffix + [output_filepath]
  57."ffmpeg_args: {ffmpeg_args}")
  58. self._run_no_output(ffmpeg_args)
  59. def _ffmpeg_concat_clips(self, clips=None, output_filepath=None):
  60. """use ffmpeg to concatenate clips into a single video"""
  61. OVERWRITE = True
  62. ffmpeg_prefix = ["ffmpeg"]
  63. ffmpeg_prefix += ["-y"] if OVERWRITE else []
  64. ffmpeg_prefix += ["-f", "concat", "-safe", "0", "-i"]
  65. # there is a method to do this via process substitution, but it's not portable
  66. # so we'll use the input file list method
  67. if not clips:
  68. raise ValueError("No clips provided")
  69. if not output_filepath:
  70. raise ValueError("No output filepath provided")
  71. # generate a temporary file with the list of clips
  72. join_file = tempfile.NamedTemporaryFile(mode="w")
  73. for clip in clips:
  74. join_file.write(f"file '{clip}'\n")
  75. join_file.flush()
  76. ffmpeg_args = ffmpeg_prefix + [] + ["-c", "copy", output_filepath]
  77."ffmpeg_args: {ffmpeg_args}")
  78. self._run_no_output(ffmpeg_args)
  79. join_file.close()
  80. def produce(self):
  81. """Produce clips or a video from the features"""
  82. clips = []
  83. for num, feature in enumerate(self.features):
  84. output_filepath = f"{self._output_dir}/highlight_{num}.mp4"
  85. self._ffmpeg_feature_to_clip(feature, output_filepath)
  86. clips.append(output_filepath)
  87. # concatenate the clips
  88. if self._compile_clips:
  89. output_filepath = f"{self._output_dir}/{self._output_filename}"
  90. self._ffmpeg_concat_clips(clips, output_filepath)
  91."Produced video: {output_filepath}")
  92. class VisualisationProducer(Producer):
  93. """Visualisation producer -- illustrate the features we have extracted"""
  94. def __init__(self, features):
  95. if not features:
  96. raise ValueError("No features provided")
  97. self.features = features
  98. def _fe_colour(self, feature) -> str:
  99. """Return a colour for a feature
  100. laughter: red
  101. loudness: blue
  102. video activity: green
  103. words: purple
  104. default: pink
  105. """
  106. if feature.feature_extractor == "laughter":
  107. return "red"
  108. if feature.feature_extractor == "loudness":
  109. return "blue"
  110. if feature.feature_extractor == "videoactivity":
  111. return "green"
  112. if feature.feature_extractor == "words":
  113. return "purple"
  114. return "black"
  115. def produce(self):
  116. """Produce visualisation"""
  117. # basic idea: use matplotlib to plot:
  118. # - a wide line segment representing the source video[s]
  119. # - shorter line segments representing the features extracted where:
  120. # + width represents duration
  121. # + colour represents feature type
  122. # + position represents time
  123. # - save as image
  124. plotted_source_videos = []
  125. bar_labels = []
  126. fig, ax = plt.subplots()
  127. for feature in self.features:
  128. # plot source video line if not done already
  129. if feature.source not in plotted_source_videos:
  130. # use video duration as width
  131. # ax.plot([0, feature.source.duration()], [0, 0], color='black', linewidth=10)
  132. ax.broken_barh([(0, feature.source.duration())], (0, 5), facecolors='grey')
  133. plotted_source_videos.append(feature.source)
  134. bar_labels.append(os.path.basename(feature.source.path))
  135. # annotate the source video
  136. ax.text(0.25, 0.25, os.path.basename(feature.source.path), ha='left', va='bottom',
  137. fontsize=16)
  138. # plot feature line
  139. # ax.plot([feature.interval.start, feature.interval.end], [1, 1], color='red', linewidth=5)
  140. ax.broken_barh([(feature.interval.start, feature.interval.duration)],
  141. (10, 5), facecolors=f'{self._fe_colour(feature)}')
  142. if feature.feature_extractor not in bar_labels:
  143. bar_labels.append(feature.feature_extractor)
  144. # label bar with feature extractor
  145. # ax.text(0, 8, feature.feature_extractor, ha='left', va='bottom',
  146. # fontsize=16)
  147. # label the plot's axes
  148. ax.set_xlabel('Time')
  149. # ax.set_yticks([], labels=bar_labels)
  150. ax.set_yticks([])
  151. # ax.tick_params(axis='y', labelrotation=90, ha='right')
  152. # save the plot
  153. plt.savefig("/tmp/visualisation.png")
  154. plt.close()
  155. class PipelineJSONEncoder(json.JSONEncoder):
  156. def default(self, obj):
  157. if hasattr(obj, 'to_json'):
  158. return obj.to_json()
  159. else:
  160. return json.JSONEncoder.default(self, obj)
  161. class JSONProducer(Producer):
  162. """Produce JSON output"""
  163. def __init__(self, features):
  164. if not features:
  165. raise ValueError("No features provided")
  166. self.features = features
  167. def produce(self):
  168. # FIXME: config option for output path
  169. with open("/tmp/features.json", "w") as jsonfile:
  170. jsonfile.write(json.dumps(self.features, cls=PipelineJSONEncoder, indent=4))