-
Notifications
You must be signed in to change notification settings - Fork 195
/
Copy pathvideo_split_by_scene_mapper.py
150 lines (125 loc) · 5.68 KB
/
video_split_by_scene_mapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import math
import re
from itertools import chain
from pydantic import NonNegativeFloat, NonNegativeInt
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import (add_suffix_to_filename,
transfer_filename)
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import SpecialTokens
from ..base_op import OPERATORS, Mapper
scenedetect = LazyLoader('scenedetect', 'scenedetect')
OP_NAME = 'video_split_by_scene_mapper'
def replace_func(match, scene_counts_iter):
try:
count = next(scene_counts_iter)
return SpecialTokens.video * count
except StopIteration:
return match.group(0)
@OPERATORS.register_module(OP_NAME)
class VideoSplitBySceneMapper(Mapper):
"""Mapper to cut videos into scene clips.
"""
# Define shared detector keys and their properties
avaliable_detectors = {
'ContentDetector': ['weights', 'luma_only', 'kernel_size'],
'AdaptiveDetector': [
'window_width', 'min_content_val', 'weights', 'luma_only',
'kernel_size', 'video_manager', 'min_delta_hsv'
],
'ThresholdDetector':
['fade_bias', 'add_final_scene', 'method', 'block_size']
}
def __init__(self,
detector: str = 'ContentDetector',
threshold: NonNegativeFloat = 27.0,
min_scene_len: NonNegativeInt = 15,
show_progress: bool = False,
*args,
**kwargs):
"""
Initialization method.
:param detector: Algorithm from `scenedetect.detectors`. Should be one
of ['ContentDetector', 'ThresholdDetector', 'AdaptiveDetector`].
:param threshold: Threshold passed to the detector.
:param min_scene_len: Minimum length of any scene.
:param show_progress: Whether to show progress from scenedetect.
:param args: extra args
:param kwargs: extra args
"""
super().__init__(*args, **kwargs)
self._init_parameters = self.remove_extra_parameters(locals())
if detector not in self.avaliable_detectors:
raise ValueError(
f'Scene detector {detector} is not supported. '
f'Can only be one of {list(self.avaliable_detectors.keys())}')
self.detector = detector
self.threshold = threshold
self.min_scene_len = min_scene_len
self.show_progress = show_progress
# prepare detector args
avaliable_kwargs = self.avaliable_detectors[self.detector]
self.detector_class = getattr(scenedetect.detectors, self.detector)
self.detector_kwargs = {
key: kwargs[key]
for key in avaliable_kwargs if key in kwargs
}
def process_single(self, sample, context=False):
# there is no video in this sample
if self.video_key not in sample or not sample[self.video_key]:
sample[Fields.source_file] = []
return sample
# load videos
loaded_video_keys = sample[self.video_key]
output_video_keys = {}
scene_counts = {}
for video_key in loaded_video_keys:
# skip duplicate
if video_key in output_video_keys:
continue
redirected_video_key = transfer_filename(video_key, OP_NAME,
**self._init_parameters)
output_template = add_suffix_to_filename(redirected_video_key,
'_$SCENE_NUMBER')
# detect scenes
detector = self.detector_class(self.threshold, self.min_scene_len,
**self.detector_kwargs)
scene_list = scenedetect.detect(video_key,
detector,
show_progress=self.show_progress,
start_in_scene=True)
scene_counts[video_key] = len(scene_list)
if len(scene_list) > 1:
# sync with split_video_ffmpeg internal
scene_num_format = f'%0{max(3, math.floor(math.log(len(scene_list), 10)) + 1)}d' # noqa: E501
output_video_keys[video_key] = [
output_template.replace('$SCENE_NUMBER',
scene_num_format % (i + 1))
for i in range(len(scene_list))
]
# split video into clips
scenedetect.split_video_ffmpeg(
input_video_path=video_key,
scene_list=scene_list,
output_file_template=output_template,
show_progress=self.show_progress)
else:
output_video_keys[video_key] = [video_key]
# replace splited video tokens
if self.text_key in sample:
scene_counts_iter = iter(
[scene_counts[key] for key in loaded_video_keys])
updated_text = re.sub(
re.escape(SpecialTokens.video),
lambda match: replace_func(match, scene_counts_iter),
sample[self.text_key])
sample[self.text_key] = updated_text
# when the file is modified, its source file needs to be updated.
sample[Fields.source_file] = []
for value in loaded_video_keys:
sample[Fields.source_file].extend([value] *
len(output_video_keys[value]))
sample[self.video_key] = list(
chain.from_iterable(
[output_video_keys[key] for key in loaded_video_keys]))
return sample