Skip to content

Commit

Permalink
Merge pull request #150 from kerberos-io/fix/align-pts2-webrtc
Browse files Browse the repository at this point in the history
Fix/align pts2 webrtc
  • Loading branch information
cedricve authored Jan 2, 2025
2 parents a783914 + 1004731 commit 9e3d705
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 34 deletions.
36 changes: 27 additions & 9 deletions machinery/src/capture/gortsplib.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
// called when a MULAW audio RTP packet arrives
if g.AudioG711Media != nil && g.AudioG711Forma != nil {
g.Client.OnPacketRTP(g.AudioG711Media, g.AudioG711Forma, func(rtppkt *rtp.Packet) {
// decode timestamp
pts, ok := g.Client.PacketPTS(g.AudioG711Media, rtppkt)
// decode timestamp
pts2, ok := g.Client.PacketPTS2(g.AudioG711Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
return
Expand All @@ -427,8 +428,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
IsKeyFrame: false,
Packet: rtppkt,
Data: op,
Time: pts,
CompositionTime: pts,
Time: pts2,
TimeLegacy: pts,
CompositionTime: pts2,
Idx: g.AudioG711Index,
IsVideo: false,
IsAudio: true,
Expand All @@ -443,6 +445,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
g.Client.OnPacketRTP(g.AudioMPEG4Media, g.AudioMPEG4Forma, func(rtppkt *rtp.Packet) {
// decode timestamp
pts, ok := g.Client.PacketPTS(g.AudioMPEG4Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.AudioMPEG4Media, rtppkt)
if !ok {
log.Log.Error("capture.golibrtsp.Start(): " + "unable to get PTS")
return
Expand All @@ -466,8 +469,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
IsKeyFrame: false,
Packet: rtppkt,
Data: enc,
Time: pts,
CompositionTime: pts,
Time: pts2,
TimeLegacy: pts,
CompositionTime: pts2,
Idx: g.AudioG711Index,
IsVideo: false,
IsAudio: true,
Expand All @@ -480,6 +484,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
// called when a video RTP packet arrives for H264
var filteredAU [][]byte
if g.VideoH264Media != nil && g.VideoH264Forma != nil {

dtsExtractor := h264.NewDTSExtractor2()

g.Client.OnPacketRTP(g.VideoH264Media, g.VideoH264Forma, func(rtppkt *rtp.Packet) {

// This will check if we need to stop the thread,
Expand All @@ -494,6 +501,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets

// decode timestamp
pts, ok := g.Client.PacketPTS(g.VideoH264Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.VideoH264Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
return
Expand Down Expand Up @@ -571,12 +579,20 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
return
}

// Extract DTS from RTP packets
dts2, err := dtsExtractor.Extract(filteredAU, pts2)
if err != nil {
log.Log.Error("capture.golibrtsp.Start(): " + err.Error())
return
}

pkt := packets.Packet{
IsKeyFrame: idrPresent,
Packet: rtppkt,
Data: enc,
Time: pts,
CompositionTime: pts,
Time: pts2,
TimeLegacy: pts,
CompositionTime: dts2,
Idx: g.VideoH264Index,
IsVideo: true,
IsAudio: false,
Expand Down Expand Up @@ -639,6 +655,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets

// decode timestamp
pts, ok := g.Client.PacketPTS(g.VideoH265Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.VideoH265Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
return
Expand Down Expand Up @@ -702,8 +719,9 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
IsKeyFrame: isRandomAccess,
Packet: rtppkt,
Data: enc,
Time: pts,
CompositionTime: pts,
Time: pts2,
TimeLegacy: pts,
CompositionTime: pts2,
Idx: g.VideoH265Index,
IsVideo: true,
IsAudio: false,
Expand Down
15 changes: 9 additions & 6 deletions machinery/src/capture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
nextPkt.IsKeyFrame && (timestamp+recordingPeriod-now <= 0 || now-startRecording >= maxRecordingPeriod) {

// Write the last packet
ttime := convertPTS(pkt.Time)
ttime := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
Expand Down Expand Up @@ -242,7 +242,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}

ttime := convertPTS(pkt.Time)
ttime := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
Expand All @@ -261,7 +261,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
recordingStatus = "started"

} else if start {
ttime := convertPTS(pkt.Time)
ttime := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
Expand Down Expand Up @@ -337,7 +337,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat

log.Log.Info("capture.main.HandleRecordStream(motiondetection): Start motion based recording ")

var lastDuration time.Duration
var lastDuration int64
var lastRecordingTime int64

//var cws *cacheWriterSeeker
Expand Down Expand Up @@ -444,8 +444,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
start = true
}
if start {

ttime := convertPTS(pkt.Time)
ttime := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
Expand Down Expand Up @@ -695,3 +694,7 @@ func JpegImage(captureDevice *Capture, communication *models.Communication) imag
func convertPTS(v time.Duration) uint64 {
return uint64(v.Milliseconds())
}

func convertPTS2(v int64) uint64 {
return uint64(v) / 100
}
2 changes: 1 addition & 1 deletion machinery/src/components/Kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
if subStreamEnabled && rtspSubClient != nil {
subQueue = packets.NewQueue()
communication.SubQueue = subQueue
subQueue.SetMaxGopCount(1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
subQueue.SetMaxGopCount(3) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room).
subQueue.WriteHeader(videoSubStreams)
go rtspSubClient.Start(context.Background(), "sub", subQueue, configuration, communication)

Expand Down
17 changes: 9 additions & 8 deletions machinery/src/packets/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
// Packet represents an RTP Packet
type Packet struct {
Packet *rtp.Packet
IsAudio bool // packet is audio
IsVideo bool // packet is video
IsKeyFrame bool // video packet is key frame
Idx int8 // stream index in container format
Codec string // codec name
CompositionTime time.Duration // packet presentation time minus decode time for H264 B-Frame
Time time.Duration // packet decode time
Data []byte // packet data
IsAudio bool // packet is audio
IsVideo bool // packet is video
IsKeyFrame bool // video packet is key frame
Idx int8 // stream index in container format
Codec string // codec name
CompositionTime int64 // packet presentation time minus decode time for H264 B-Frame
Time int64 // packet decode time
TimeLegacy time.Duration
Data []byte // packet data
}
3 changes: 1 addition & 2 deletions machinery/src/packets/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package packets
import (
"io"
"sync"
"time"
)

// time
Expand Down Expand Up @@ -145,7 +144,7 @@ func (self *Queue) Oldest() *QueueCursor {
}

// Create cursor position at specific time in buffered packets.
func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor {
func (self *Queue) DelayedTime(dur int64) *QueueCursor {
cursor := self.newCursor()
cursor.init = func(buf *Buf, videoidx int) BufPos {
i := buf.Tail - 1
Expand Down
18 changes: 10 additions & 8 deletions machinery/src/webrtc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C

var cursorError error
var pkt packets.Packet
var previousTimeVideo time.Duration
var previousTimeAudio time.Duration
//var previousTimeVideo int64
//var previousTimeAudio int64

start := false
receivedKeyFrame := false
Expand Down Expand Up @@ -401,15 +401,16 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
if pkt.IsVideo {

// Calculate the difference
bufferDuration := pkt.Time - previousTimeVideo
previousTimeVideo = pkt.Time
//bufferDuration := pkt.Time - previousTimeVideo
//previousTimeVideo = pkt.Time

// Start at the first keyframe
if pkt.IsKeyFrame {
start = true
}
if start {
sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration}
//bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
if config.Capture.ForwardWebRTC == "true" {
// We will send the video to a remote peer
// TODO..
Expand All @@ -432,11 +433,12 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
}

// Calculate the difference
bufferDuration := pkt.Time - previousTimeAudio
previousTimeAudio = pkt.Time
//bufferDuration := pkt.Time - previousTimeAudio
//previousTimeAudio = pkt.Time

// We will send the audio
sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration}
//bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
if err := audioTrack.WriteSample(sample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error())
}
Expand Down

0 comments on commit 9e3d705

Please sign in to comment.