You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
8.0 KiB

  1. """Classes for producing videos"""
  2. from abc import ABC
  3. import json
  4. import logging
  5. import os
  6. import subprocess
  7. import tempfile
  8. # for visualisations:
  9. import matplotlib.pyplot as plt
  10. # for encoding as JSON
  11. from pipeline.utils import Feature
  12. class Producer(ABC):
  13. """Generic producer interface."""
  14. def __init__(self, features):
  15. """All producers should take a list of features as input"""
  16. def produce(self):
  17. """All Producers should produce something!"""
  18. class VideoProducer(Producer):
  19. """Video producer interface."""
  20. class FfmpegVideoProducer(VideoProducer):
  21. """Produce videos using ffmpeg"""
  22. _CONFIG_DEFAULT_OUTPUT_DIR = "/tmp/"
  23. _CONFIG_DEFAULT_OUTPUT_FILENAME = "highlights.mp4"
  24. _CONFIG_COMPILE_CLIPS = True
  25. def __init__(self, features, compile_clips=_CONFIG_COMPILE_CLIPS,
  26. output_dir=_CONFIG_DEFAULT_OUTPUT_DIR,
  27. output_filename=_CONFIG_DEFAULT_OUTPUT_FILENAME,
  28. ) -> None:
  29. if not features:
  30. raise ValueError("No features provided")
  31. # TODO: consider if we want to permit empty features (producing no video)
  32. self.features = features
  33. self._compile_clips = compile_clips
  34. self._output_dir = output_dir
  35. self._output_filename = output_filename
  36. def _run_no_output(self, cmd: list, cwd:str=".") -> None:
  37. """Run a command and return the output as a string
  38. Defined to be mocked out in tests via unittest.mock.patch
  39. """
  40. subprocess.run(cmd, stdout=None, stderr=None, cwd=cwd)
  41. def _ffmpeg_feature_to_clip(self, feature=None, output_filepath=None):
  42. """use ffmpeg to produve a video clip from a feature"""
  43. OVERWRITE = True # TODO: consider making this a config option
  44. if not feature or not feature.interval:
  45. raise ValueError("No feature provided")
  46. if not output_filepath:
  47. raise ValueError("No output filepath provided")
  48. ffmpeg_prefix = ["ffmpeg", "-y"] if OVERWRITE else ["ffmpeg"]
  49. ffmpeg_suffix = ["-r", "60", "-c:v", "libx264", "-crf", "26", "-c:a", "aac", "-preset", "ultrafast"]
  50. # TODO: match framerate of input video
  51. # TODO: adjustable encoding options
  52. seek = ["-ss", str(feature.interval.start)]
  53. duration = ["-t", str(feature.interval.duration)]
  54. ffmpeg_args = ffmpeg_prefix + seek + ["-i"] + [feature.source.path] +\
  55. duration + ffmpeg_suffix + [output_filepath]
  56. logging.info(f"ffmpeg_args: {ffmpeg_args}")
  57. self._run_no_output(ffmpeg_args)
  58. def _ffmpeg_concat_clips(self, clips=None, output_filepath=None):
  59. """use ffmpeg to concatenate clips into a single video"""
  60. OVERWRITE = True
  61. ffmpeg_prefix = ["ffmpeg"]
  62. ffmpeg_prefix += ["-y"] if OVERWRITE else []
  63. ffmpeg_prefix += ["-f", "concat", "-safe", "0", "-i"]
  64. # there is a method to do this via process substitution, but it's not portable
  65. # so we'll use the input file list method
  66. if not clips:
  67. raise ValueError("No clips provided")
  68. if not output_filepath:
  69. raise ValueError("No output filepath provided")
  70. # generate a temporary file with the list of clips
  71. join_file = tempfile.NamedTemporaryFile(mode="w")
  72. for clip in clips:
  73. join_file.write(f"file '{clip}'\n")
  74. join_file.flush()
  75. ffmpeg_args = ffmpeg_prefix + [join_file.name] + ["-c", "copy", output_filepath]
  76. logging.info(f"ffmpeg_args: {ffmpeg_args}")
  77. self._run_no_output(ffmpeg_args)
  78. join_file.close()
  79. def produce(self):
  80. """Produce clips or a video from the features"""
  81. clips = []
  82. for num, feature in enumerate(self.features):
  83. output_filepath = f"{self._output_dir}/highlight_{num}.mp4"
  84. self._ffmpeg_feature_to_clip(feature, output_filepath)
  85. clips.append(output_filepath)
  86. # concatenate the clips
  87. if self._compile_clips:
  88. output_filepath = f"{self._output_dir}/{self._output_filename}"
  89. self._ffmpeg_concat_clips(clips, output_filepath)
  90. logging.info(f"Produced video: {output_filepath}")
  91. class VisualisationProducer(Producer):
  92. """Visualisation producer -- illustrate the features we have extracted"""
  93. DEFAULT_OUTPUT_FILEPATH = "visualisation.png"
  94. def __init__(self, features, output_filepath=DEFAULT_OUTPUT_FILEPATH):
  95. if not features:
  96. raise ValueError("No features provided")
  97. self.features = features
  98. if not output_filepath:
  99. raise ValueError("No output filepath provided")
  100. self.output_filepath = output_filepath
  101. def _fe_colour(self, feature) -> str:
  102. """Return a colour for a feature
  103. laughter: red
  104. loudness: blue
  105. video activity: green
  106. words: purple
  107. default: pink
  108. """
  109. if feature.feature_extractor == "laughter":
  110. return "red"
  111. if feature.feature_extractor == "loudness":
  112. return "blue"
  113. if feature.feature_extractor == "videoactivity":
  114. return "green"
  115. if feature.feature_extractor == "words":
  116. return "purple"
  117. return "black"
  118. def produce(self):
  119. """Produce visualisation"""
  120. # basic idea: use matplotlib to plot:
  121. # - a wide line segment representing the source video[s]
  122. # - shorter line segments representing the features extracted where:
  123. # + width represents duration
  124. # + colour represents feature type
  125. # + position represents time
  126. # - save as image
  127. plotted_source_videos = []
  128. bar_labels = []
  129. fig, ax = plt.subplots()
  130. for feature in self.features:
  131. # plot source video line if not done already
  132. if feature.source not in plotted_source_videos:
  133. # use video duration as width
  134. # ax.plot([0, feature.source.duration()], [0, 0], color='black', linewidth=10)
  135. ax.broken_barh([(0, feature.source.duration())], (0, 5), facecolors='grey')
  136. plotted_source_videos.append(feature.source)
  137. bar_labels.append(os.path.basename(feature.source.path))
  138. # annotate the source video
  139. ax.text(0.25, 0.25, os.path.basename(feature.source.path), ha='left', va='bottom',
  140. fontsize=16)
  141. # plot feature line
  142. # ax.plot([feature.interval.start, feature.interval.end], [1, 1], color='red', linewidth=5)
  143. ax.broken_barh([(feature.interval.start, feature.interval.duration)],
  144. (10, 5), facecolors=f'{self._fe_colour(feature)}')
  145. if feature.feature_extractor not in bar_labels:
  146. bar_labels.append(feature.feature_extractor)
  147. # label bar with feature extractor
  148. # ax.text(0, 8, feature.feature_extractor, ha='left', va='bottom',
  149. # fontsize=16)
  150. # label the plot's axes
  151. ax.set_xlabel('Time')
  152. # ax.set_yticks([], labels=bar_labels)
  153. ax.set_yticks([])
  154. # ax.tick_params(axis='y', labelrotation=90, ha='right')
  155. # save the plot
  156. plt.savefig(self.output_filepath)
  157. plt.close()
  158. class PipelineJSONEncoder(json.JSONEncoder):
  159. def default(self, obj):
  160. if hasattr(obj, 'to_json'):
  161. return obj.to_json()
  162. else:
  163. return json.JSONEncoder.default(self, obj)
  164. class JSONProducer(Producer):
  165. """Produce JSON output"""
  166. DEFAULT_OUTPUT_FILEPATH = "features.json"
  167. def __init__(self, features, output_filepath=DEFAULT_OUTPUT_FILEPATH):
  168. if not features:
  169. raise ValueError("No features provided")
  170. self.features = features
  171. if not output_filepath:
  172. raise ValueError("No output filepath provided")
  173. self.output_filepath = output_filepath
  174. def produce(self):
  175. with open(self.output_filepath, "w") as jsonfile:
  176. jsonfile.write(json.dumps(self.features, cls=PipelineJSONEncoder, indent=4))