Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

197 строки
7.3 KiB

  1. """Classes for producing videos"""
  2. from abc import ABC
  3. import json
  4. import logging
  5. import os
  6. import subprocess
  7. import tempfile
  8. # for visualisations:
  9. import matplotlib.pyplot as plt
  10. # for encoding as JSON
  11. from pipeline.utils import Feature
  12. class Producer(ABC):
  13. """Generic producer interface."""
  14. def __init__(self, features):
  15. """All producers should take a list of features as input"""
  16. def produce(self):
  17. """All Producers should produce something!"""
  18. class VideoProducer(Producer):
  19. """Video producer interface."""
  20. class FfmpegVideoProducer(VideoProducer):
  21. """Produce videos using ffmpeg"""
  22. # TODO: consider output filename options
  23. _CONFIG_COMPILE_CLIPS = True
  24. def __init__(self, features, compile_clips=_CONFIG_COMPILE_CLIPS) -> None:
  25. if not features:
  26. raise ValueError("No features provided")
  27. # TODO: consider if we want to permit empty features (producing no video)
  28. self.features = features
  29. self._compile_clips = compile_clips
  30. def _run_no_output(self, cmd: list, cwd:str=".") -> None:
  31. """Run a command and return the output as a string
  32. Defined to be mocked out in tests via unittest.mock.patch
  33. """
  34. subprocess.run(cmd, stdout=None, stderr=None, cwd=cwd)
  35. def _ffmpeg_feature_to_clip(self, feature=None, output_filepath=None):
  36. """use ffmpeg to produve a video clip from a feature"""
  37. OVERWRITE = True # TODO: consider making this a config option
  38. if not feature or not feature.interval:
  39. raise ValueError("No feature provided")
  40. if not output_filepath:
  41. raise ValueError("No output filepath provided")
  42. ffmpeg_prefix = ["ffmpeg", "-y"] if OVERWRITE else ["ffmpeg"]
  43. ffmpeg_suffix = ["-r", "60", "-c:v", "libx264", "-crf", "26", "-c:a", "aac", "-preset", "ultrafast"]
  44. # TODO: match framerate of input video
  45. # TODO: adjustable encoding options
  46. seek = ["-ss", str(feature.interval.start)]
  47. duration = ["-t", str(feature.interval.duration)]
  48. ffmpeg_args = ffmpeg_prefix + seek + ["-i"] + [feature.source.path] +\
  49. duration + ffmpeg_suffix + [output_filepath]
  50. logging.info(f"ffmpeg_args: {ffmpeg_args}")
  51. self._run_no_output(ffmpeg_args)
  52. def _ffmpeg_concat_clips(self, clips=None, output_filepath=None):
  53. """use ffmpeg to concatenate clips into a single video"""
  54. OVERWRITE = True
  55. ffmpeg_prefix = ["ffmpeg"]
  56. ffmpeg_prefix += ["-y"] if OVERWRITE else []
  57. ffmpeg_prefix += ["-f", "concat", "-safe", "0", "-i"]
  58. # there is a method to do this via process substitution, but it's not portable
  59. # so we'll use the input file list method
  60. if not clips:
  61. raise ValueError("No clips provided")
  62. if not output_filepath:
  63. raise ValueError("No output filepath provided")
  64. # generate a temporary file with the list of clips
  65. join_file = tempfile.NamedTemporaryFile(mode="w")
  66. for clip in clips:
  67. join_file.write(f"file '{clip}'\n")
  68. join_file.flush()
  69. ffmpeg_args = ffmpeg_prefix + [join_file.name] + ["-c", "copy", output_filepath]
  70. logging.info(f"ffmpeg_args: {ffmpeg_args}")
  71. self._run_no_output(ffmpeg_args)
  72. join_file.close()
  73. def produce(self):
  74. OUTPUT_DIR = "/tmp/" # TODO: make this a config option
  75. clips = []
  76. for num, feature in enumerate(self.features):
  77. output_filepath = f"{OUTPUT_DIR}/highlight_{num}.mp4"
  78. self._ffmpeg_feature_to_clip(feature, output_filepath)
  79. clips.append(output_filepath)
  80. # concatenate the clips
  81. if self._compile_clips:
  82. output_filepath = f"{OUTPUT_DIR}/highlights.mp4"
  83. self._ffmpeg_concat_clips(clips, output_filepath)
  84. logging.info(f"Produced video: {output_filepath}")
  85. class VisualisationProducer(Producer):
  86. """Visualisation producer -- illustrate the features we have extracted"""
  87. def __init__(self, features):
  88. if not features:
  89. raise ValueError("No features provided")
  90. self.features = features
  91. def _fe_colour(self, feature) -> str:
  92. """Return a colour for a feature
  93. laughter: red
  94. loudness: blue
  95. video activity: green
  96. words: purple
  97. default: pink
  98. """
  99. if feature.feature_extractor == "laughter":
  100. return "red"
  101. if feature.feature_extractor == "loudness":
  102. return "blue"
  103. if feature.feature_extractor == "videoactivity":
  104. return "green"
  105. if feature.feature_extractor == "words":
  106. return "purple"
  107. return "black"
  108. def produce(self):
  109. """Produce visualisation"""
  110. # basic idea: use matplotlib to plot:
  111. # - a wide line segment representing the source video[s]
  112. # - shorter line segments representing the features extracted where:
  113. # + width represents duration
  114. # + colour represents feature type
  115. # + position represents time
  116. # - save as image
  117. plotted_source_videos = []
  118. bar_labels = []
  119. fig, ax = plt.subplots()
  120. for feature in self.features:
  121. # plot source video line if not done already
  122. if feature.source not in plotted_source_videos:
  123. # use video duration as width
  124. # ax.plot([0, feature.source.duration()], [0, 0], color='black', linewidth=10)
  125. ax.broken_barh([(0, feature.source.duration())], (0, 5), facecolors='grey')
  126. plotted_source_videos.append(feature.source)
  127. bar_labels.append(os.path.basename(feature.source.path))
  128. # annotate the source video
  129. ax.text(0.25, 0.25, os.path.basename(feature.source.path), ha='left', va='bottom',
  130. fontsize=16)
  131. # plot feature line
  132. # ax.plot([feature.interval.start, feature.interval.end], [1, 1], color='red', linewidth=5)
  133. ax.broken_barh([(feature.interval.start, feature.interval.duration)],
  134. (10, 5), facecolors=f'{self._fe_colour(feature)}')
  135. if feature.feature_extractor not in bar_labels:
  136. bar_labels.append(feature.feature_extractor)
  137. # label bar with feature extractor
  138. # ax.text(0, 8, feature.feature_extractor, ha='left', va='bottom',
  139. # fontsize=16)
  140. # label the plot's axes
  141. ax.set_xlabel('Time')
  142. # ax.set_yticks([], labels=bar_labels)
  143. ax.set_yticks([])
  144. # ax.tick_params(axis='y', labelrotation=90, ha='right')
  145. # save the plot
  146. plt.savefig("/tmp/visualisation.png")
  147. plt.close()
  148. class PipelineJSONEncoder(json.JSONEncoder):
  149. def default(self, obj):
  150. if hasattr(obj, 'to_json'):
  151. return obj.to_json()
  152. else:
  153. return json.JSONEncoder.default(self, obj)
  154. class JSONProducer(Producer):
  155. """Produce JSON output"""
  156. def __init__(self, features):
  157. if not features:
  158. raise ValueError("No features provided")
  159. self.features = features
  160. def produce(self):
  161. # FIXME: config option for output path
  162. with open("/tmp/features.json", "w") as jsonfile:
  163. jsonfile.write(json.dumps(self.features, cls=PipelineJSONEncoder, indent=4))