-
Notifications
You must be signed in to change notification settings - Fork 5
/
subscriber.go
131 lines (115 loc) · 3.26 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package transform
import (
"encoding/hex"
"fmt"
"log"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/track"
)
type TransformSubscriber struct {
Subscriber
task *TransformTask
}
func (s *TransformSubscriber) Delete() {
s.Stop(zap.String("reason", "for restart"))
}
func sliceAppend(s1 []byte, s2 []byte) []byte {
l1 := len(s1)
l2 := len(s2)
if cap(s1) < l2+l1 {
return s1
}
for i := 0; i < len(s2); i++ {
s1[l1+i] = s2[i]
}
return s1
}
func (s *TransformSubscriber) OnEvent(event any) {
//获取转码流发布者
t := s.task
//s.Stream.Path
switch v := event.(type) {
case *track.Video:
fmt.Println("=====> write track.Video to publisher")
if s.Video != nil {
return
}
switch v.CodecID {
case codec.CodecID_H264:
fmt.Println("=====> CodecID_H264 on sub:", v.PayloadType)
// vt := p.VideoTrack
// vt = track.NewH264(p.Stream, v.PayloadType)
// p.VideoTrack = vt
log.Printf("pipe in SPS:%d, [%v]\n",
len(v.ParamaterSets[0]), hex.EncodeToString(v.ParamaterSets[0]))
log.Printf("pipe in PPS:%d, [%v]\n",
len(v.ParamaterSets[1]), hex.EncodeToString(v.ParamaterSets[1]))
//2023/04/02 17:07:51 pipe in SPS:35, [6764001fac2ca4014016ec04400000fa000030d43800001e848000186a02ef2e0fa489]
//2023/04/02 17:07:51 pipe in PPS:4, [68eb8f2c]
nal := []byte{0, 0, 0, 1}
//SPS
if len(v.ParamaterSets[0]) > 0 {
//vt.WriteSliceBytes(v.ParamaterSets[0])
t.writeToFFPipe0(append(nal, v.ParamaterSets[0]...))
}
//PPS:
if len(v.ParamaterSets[1]) > 0 {
//vt.WriteSliceBytes(v.ParamaterSets[1])
t.writeToFFPipe0(append(nal, v.ParamaterSets[1]...))
}
case codec.CodecID_H265:
fmt.Println("=====> CodecID_H265 on sub")
// vt := p.VideoTrack
// vt = track.NewH265(p.Stream, v.PayloadType)
// p.VideoTrack = vt
// //VPS
// if len(v.ParamaterSets[0]) > 0 {
// vt.WriteSliceBytes(v.ParamaterSets[0])
// }
// //SPS
// if len(v.ParamaterSets[1]) > 0 {
// vt.WriteSliceBytes(v.ParamaterSets[1])
// }
// //PPS:
// if len(v.ParamaterSets[2]) > 0 {
// vt.WriteSliceBytes(v.ParamaterSets[2])
// }
}
s.AddTrack(v)
case *track.Audio:
if s.Audio != nil {
return
}
fmt.Println("=====> write *track.Audio to publisher")
//p.VideoTrack.WriteAnnexB(v.PTS, v.DTS, v.GetAnnexB()[0])
s.AddTrack(v)
case VideoFrame:
//fmt.Println("=====> write VideoFrame to publisher")
firstFrame := v.GetAnnexB()
// log.Printf("pipe in PTS:%d,DTS:%d buf num:%d\n",
// v.PTS, v.DTS, len(firstFrame))
for _, buf := range firstFrame {
// log.Printf("pipe in PTS:%d,DTS:%d buf len:%d,\n",
// v.PTS, v.DTS, len(buf))
//s.debugPrintfNal(buf, "on sub frame")
//p.VideoTrack.WriteAnnexB(v.PTS, v.DTS, buf)
t.writeToFFPipe0(buf)
}
case VideoRTP:
fmt.Println("=====> on subscribe VideoRTP")
//p.WritePacketRTP(s.videoTrack, v.Packet)
//p.VideoTrack.WriteRTPPack(v.Packet)
case AudioRTP:
fmt.Println("=====> on subscribe AudioRTP to")
//s.stream.WritePacketRTP(s.audioTrack, v.Packet)
//p.AudioTrack.WriteRTPPack(v.Packet)
case ISubscriber:
//代表订阅成功事件,v就是p
fmt.Println("=====> begain Subscriber sucess")
default:
s.Subscriber.OnEvent(event)
//fmt.Println("TransformSubscriber OnEvent:%T", v)
}
}