-
Notifications
You must be signed in to change notification settings - Fork 191
/
ray_video_deduplicator.py
59 lines (48 loc) · 1.91 KB
/
ray_video_deduplicator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import hashlib
from pydantic import PositiveInt
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)
from ..base_op import OPERATORS
from ..op_fusion import LOADED_VIDEOS
from .ray_basic_deduplicator import RayBasicDeduplicator
OP_NAME = 'ray_video_deduplicator'
@OPERATORS.register_module(OP_NAME)
@LOADED_VIDEOS.register_module(OP_NAME)
class RayVideoDeduplicator(RayBasicDeduplicator):
"""
Deduplicator to deduplicate samples at document-level using exact matching
of videos between documents.
"""
def __init__(self,
redis_host: str = 'localhost',
redis_port: PositiveInt = 6380,
*args,
**kwargs):
"""
Initialization.
:param redis_host: the hostname of redis server
:param redis_port: the port of redis server
:param args: extra args
:param kwargs: extra args
"""
super().__init__(redis_host=redis_host,
redis_port=redis_port,
*args,
**kwargs)
def calculate_hash(self, sample, context=False):
if self.video_key not in sample or not sample[self.video_key]:
return RayBasicDeduplicator.EMPTY_HASH_VALUE
# load videos
loaded_video_keys = sample[self.video_key]
sample, videos = load_data_with_context(sample, context,
loaded_video_keys, load_video)
# compute hash
md5_hash = hashlib.md5()
for key in videos:
# consider the multi stream of video in one container
for packet in videos[key].demux():
if packet.stream.type == 'video':
md5_hash.update(bytes(packet))
for key in videos:
close_video(videos[key])
return md5_hash.hexdigest()