Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

197 righe
7.3 KiB

  1. """Classes for producing videos"""
  2. from abc import ABC
  3. import json
  4. import logging
  5. import os
  6. import subprocess
  7. import tempfile
  8. # for visualisations:
  9. import matplotlib.pyplot as plt
  10. # for encoding as JSON
  11. from pipeline.utils import Feature
  12. class Producer(ABC):
  13. """Generic producer interface."""
  14. def __init__(self, features):
  15. """All producers should take a list of features as input"""
  16. def produce(self):
  17. """All Producers should produce something!"""
  18. class VideoProducer(Producer):
  19. """Video producer interface."""
  20. class FfmpegVideoProducer(VideoProducer):
  21. """Produce videos using ffmpeg"""
  22. # TODO: consider output filename options
  23. _CONFIG_COMPILE_CLIPS = True
  24. def __init__(self, features, compile_clips=_CONFIG_COMPILE_CLIPS) -> None:
  25. if not features:
  26. raise ValueError("No features provided")
  27. # TODO: consider if we want to permit empty features (producing no video)
  28. self.features = features
  29. self._compile_clips = compile_clips
  30. def _run_no_output(self, cmd: list, cwd:str=".") -> None:
  31. """Run a command and return the output as a string
  32. Defined to be mocked out in tests via unittest.mock.patch
  33. """
  34. subprocess.run(cmd, stdout=None, stderr=None, cwd=cwd)
  35. def _ffmpeg_feature_to_clip(self, feature=None, output_filepath=None):
  36. """use ffmpeg to produve a video clip from a feature"""
  37. OVERWRITE = True # TODO: consider making this a config option
  38. if not feature or not feature.interval:
  39. raise ValueError("No feature provided")
  40. if not output_filepath:
  41. raise ValueError("No output filepath provided")
  42. ffmpeg_prefix = ["ffmpeg", "-y"] if OVERWRITE else ["ffmpeg"]
  43. ffmpeg_suffix = ["-r", "60", "-c:v", "libx264", "-crf", "26", "-c:a", "aac", "-preset", "ultrafast"]
  44. # TODO: match framerate of input video
  45. # TODO: adjustable encoding options
  46. seek = ["-ss", str(feature.interval.start)]
  47. duration = ["-t", str(feature.interval.duration)]
  48. ffmpeg_args = ffmpeg_prefix + seek + ["-i"] + [feature.source.path] +\
  49. duration + ffmpeg_suffix + [output_filepath]
  50. logging.info(f"ffmpeg_args: {ffmpeg_args}")
  51. self._run_no_output(ffmpeg_args)
  52. def _ffmpeg_concat_clips(self, clips=None, output_filepath=None):
  53. """use ffmpeg to concatenate clips into a single video"""
  54. OVERWRITE = True
  55. ffmpeg_prefix = ["ffmpeg"]
  56. ffmpeg_prefix += ["-y"] if OVERWRITE else []
  57. ffmpeg_prefix += ["-f", "concat", "-safe", "0", "-i"]
  58. # there is a method to do this via process substitution, but it's not portable
  59. # so we'll use the input file list method
  60. if not clips:
  61. raise ValueError("No clips provided")
  62. if not output_filepath:
  63. raise ValueError("No output filepath provided")
  64. # generate a temporary file with the list of clips
  65. join_file = tempfile.NamedTemporaryFile(mode="w")
  66. for clip in clips:
  67. join_file.write(f"file '{clip}'\n")
  68. join_file.flush()
  69. ffmpeg_args = ffmpeg_prefix + [join_file.name] + ["-c", "copy", output_filepath]
  70. logging.info(f"ffmpeg_args: {ffmpeg_args}")
  71. self._run_no_output(ffmpeg_args)
  72. join_file.close()
  73. def produce(self):
  74. OUTPUT_DIR = "/tmp/" # TODO: make this a config option
  75. clips = []
  76. for num, feature in enumerate(self.features):
  77. output_filepath = f"{OUTPUT_DIR}/highlight_{num}.mp4"
  78. self._ffmpeg_feature_to_clip(feature, output_filepath)
  79. clips.append(output_filepath)
  80. # concatenate the clips
  81. if self._compile_clips:
  82. output_filepath = f"{OUTPUT_DIR}/highlights.mp4"
  83. self._ffmpeg_concat_clips(clips, output_filepath)
  84. logging.info(f"Produced video: {output_filepath}")
  85. class VisualisationProducer(Producer):
  86. """Visualisation producer -- illustrate the features we have extracted"""
  87. def __init__(self, features):
  88. if not features:
  89. raise ValueError("No features provided")
  90. self.features = features
  91. def _fe_colour(self, feature) -> str:
  92. """Return a colour for a feature
  93. laughter: red
  94. loudness: blue
  95. video activity: green
  96. words: purple
  97. default: pink
  98. """
  99. if feature.feature_extractor == "laughter":
  100. return "red"
  101. if feature.feature_extractor == "loudness":
  102. return "blue"
  103. if feature.feature_extractor == "videoactivity":
  104. return "green"
  105. if feature.feature_extractor == "words":
  106. return "purple"
  107. return "black"
  108. def produce(self):
  109. """Produce visualisation"""
  110. # basic idea: use matplotlib to plot:
  111. # - a wide line segment representing the source video[s]
  112. # - shorter line segments representing the features extracted where:
  113. # + width represents duration
  114. # + colour represents feature type
  115. # + position represents time
  116. # - save as image
  117. plotted_source_videos = []
  118. bar_labels = []
  119. fig, ax = plt.subplots()
  120. for feature in self.features:
  121. # plot source video line if not done already
  122. if feature.source not in plotted_source_videos:
  123. # use video duration as width
  124. # ax.plot([0, feature.source.duration()], [0, 0], color='black', linewidth=10)
  125. ax.broken_barh([(0, feature.source.duration())], (0, 5), facecolors='grey')
  126. plotted_source_videos.append(feature.source)
  127. bar_labels.append(os.path.basename(feature.source.path))
  128. # annotate the source video
  129. ax.text(0.25, 0.25, os.path.basename(feature.source.path), ha='left', va='bottom',
  130. fontsize=16)
  131. # plot feature line
  132. # ax.plot([feature.interval.start, feature.interval.end], [1, 1], color='red', linewidth=5)
  133. ax.broken_barh([(feature.interval.start, feature.interval.duration)],
  134. (10, 5), facecolors=f'{self._fe_colour(feature)}')
  135. if feature.feature_extractor not in bar_labels:
  136. bar_labels.append(feature.feature_extractor)
  137. # label bar with feature extractor
  138. # ax.text(0, 8, feature.feature_extractor, ha='left', va='bottom',
  139. # fontsize=16)
  140. # label the plot's axes
  141. ax.set_xlabel('Time')
  142. # ax.set_yticks([], labels=bar_labels)
  143. ax.set_yticks([])
  144. # ax.tick_params(axis='y', labelrotation=90, ha='right')
  145. # save the plot
  146. plt.savefig("/tmp/visualisation.png")
  147. plt.close()
  148. class PipelineJSONEncoder(json.JSONEncoder):
  149. def default(self, obj):
  150. if hasattr(obj, 'to_json'):
  151. return obj.to_json()
  152. else:
  153. return json.JSONEncoder.default(self, obj)
  154. class JSONProducer(Producer):
  155. """Produce JSON output"""
  156. def __init__(self, features):
  157. if not features:
  158. raise ValueError("No features provided")
  159. self.features = features
  160. def produce(self):
  161. # FIXME: config option for output path
  162. with open("/tmp/features.json", "w") as jsonfile:
  163. jsonfile.write(json.dumps(self.features, cls=PipelineJSONEncoder, indent=4))