Skip to content

Commit

Permalink
fix(sfu): Fix issues regarding nack,buckets and racing (#449)
Browse files Browse the repository at this point in the history
* feat(buffer): Remove channel

* fix(buffer): fix racing on rtcp reader

* fix(sfu): Fix racing issues closes #431,closes #439

* fix(buffer): Close queue on buffer close

* fix(sfu): fix racing issue on simulcast downtrack

* fix(sfu): fix racing issue on layer change

* fix(queue): Dont discard disordered packets

* fix(buffer): Lower queue size for audio streams

* fix(sfu): downtrack simulcast max layer racing

* fix(buffer): Dont make factory global

* fix(buffer): Make packet tracking configurable

* fix(sfu): Update rtx downtrack sender stats

* fix(sfu): fix tests

* fix(sfu): gofmt project
  • Loading branch information
OrlandoCo authored Mar 1, 2021
1 parent cca2a9e commit f6149bf
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 145 deletions.
6 changes: 3 additions & 3 deletions cmd/signal/allrpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func parse() bool {
}

func getEnv(key string) string {
if value, exists := os.LookupEnv(key); exists {
if value, exists := os.LookupEnv(key); exists {
return value
}
}

return ""
return ""
}

func main() {
Expand Down
4 changes: 2 additions & 2 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ withstats = false
# Limit the remb bandwidth in kbps
# zero means no limits
maxbandwidth = 1500
# max buffer time by ms for video tracks
maxbuffertime = 1000
# max number of video tracks packets the SFU will keep track
maxpackettrack = 500
# Sets the audio level volume threshold.
# Values from [0-127] where 0 is the loudest.
# Audio levels are read from rtp extension header according to:
Expand Down
107 changes: 107 additions & 0 deletions pkg/buffer/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package buffer

import (
"encoding/binary"
"math"
)

const maxPktSize = 1350

type Bucket struct {
buf []byte

headSN uint16
step int
maxSteps int
}

func NewBucket(buf []byte) *Bucket {
return &Bucket{
buf: buf,
maxSteps: int(math.Floor(float64(len(buf))/float64(maxPktSize))) - 1,
}
}

func (b *Bucket) AddPacket(pkt []byte, sn uint16, latest bool) ([]byte, error) {
if !latest {
return b.set(sn, pkt)
}
diff := sn - b.headSN
b.headSN = sn
for i := uint16(1); i < diff; i++ {
b.step++
if b.step >= b.maxSteps {
b.step = 0
}
}
return b.push(pkt), nil
}

func (b *Bucket) GetPacket(buf []byte, sn uint16) (i int, err error) {
p := b.get(sn)
if p == nil {
err = errPacketNotFound
return
}
i = len(p)
if cap(buf) < i {
err = errBufferTooSmall
return
}
if len(buf) < i {
buf = buf[:i]
}
copy(buf, p)
return
}

func (b *Bucket) push(pkt []byte) []byte {
binary.BigEndian.PutUint16(b.buf[b.step*maxPktSize:], uint16(len(pkt)))
off := b.step*maxPktSize + 2
copy(b.buf[off:], pkt)
b.step++
if b.step > b.maxSteps {
b.step = 0
}
return b.buf[off : off+len(pkt)]
}

func (b *Bucket) get(sn uint16) []byte {
pos := b.step - int(b.headSN-sn+1)
if pos < 0 {
if pos*-1 > b.maxSteps+1 {
return nil
}
pos = b.maxSteps + pos + 1
}
off := pos * maxPktSize
if off > len(b.buf) {
return nil
}
if binary.BigEndian.Uint16(b.buf[off+4:off+6]) != sn {
return nil
}
sz := int(binary.BigEndian.Uint16(b.buf[off : off+2]))
return b.buf[off+2 : off+2+sz]
}

func (b *Bucket) set(sn uint16, pkt []byte) ([]byte, error) {
if b.headSN-sn >= uint16(b.maxSteps+1) {
return nil, errPacketTooOld
}
pos := b.step - int(b.headSN-sn+1)
if pos < 0 {
pos = b.maxSteps + pos + 1
}
off := pos * maxPktSize
if off > len(b.buf) || off < 0 {
return nil, errPacketTooOld
}
// Do not overwrite if packet exist
if binary.BigEndian.Uint16(b.buf[off+4:off+6]) == sn {
return b.buf[off+2 : off+2+len(pkt)], nil
}
binary.BigEndian.PutUint16(b.buf[off:], uint16(len(pkt)))
copy(b.buf[off+2:], pkt)
return b.buf[off+2 : off+2+len(pkt)], nil
}
62 changes: 36 additions & 26 deletions pkg/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/gammazero/deque"

log "github.com/pion/ion-log"
"github.com/pion/rtcp"
"github.com/pion/rtp"
Expand Down Expand Up @@ -40,22 +39,23 @@ type ExtPacket struct {
// Buffer contains all packets
type Buffer struct {
sync.Mutex
pool *sync.Pool
nacker *nackQueue
packetQueue *PacketQueue
codecType webrtc.RTPCodecType
extPackets deque.Deque
pPackets []pendingPackets
closeOnce sync.Once
mediaSSRC uint32
clockRate uint32
maxBitrate uint64
lastReport int64
twccExt uint8
audioExt uint8
bound bool
closed atomicBool
mime string
bucket *Bucket
nacker *nackQueue
videoPool *sync.Pool
audioPool *sync.Pool
codecType webrtc.RTPCodecType
extPackets deque.Deque
pPackets []pendingPackets
closeOnce sync.Once
mediaSSRC uint32
clockRate uint32
maxBitrate uint64
lastReport int64
twccExt uint8
audioExt uint8
bound bool
closed atomicBool
mime string

// supported feedbacks
remb bool
Expand Down Expand Up @@ -101,15 +101,15 @@ type Stats struct {

// BufferOptions provides configuration options for the buffer
type Options struct {
BufferTime int
MaxBitRate uint64
}

// NewBuffer constructs a new Buffer
func NewBuffer(ssrc uint32, pp *sync.Pool) *Buffer {
func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer {
b := &Buffer{
mediaSSRC: ssrc,
pool: pp,
videoPool: vp,
audioPool: ap,
}
b.extPackets.SetMinCapacity(7)
return b
Expand All @@ -126,8 +126,10 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, o Options) {
switch {
case strings.HasPrefix(b.mime, "audio/"):
b.codecType = webrtc.RTPCodecTypeAudio
b.bucket = NewBucket(b.audioPool.Get().([]byte))
case strings.HasPrefix(b.mime, "video/"):
b.codecType = webrtc.RTPCodecTypeVideo
b.bucket = NewBucket(b.videoPool.Get().([]byte))
default:
b.codecType = webrtc.RTPCodecType(0)
}
Expand All @@ -140,7 +142,6 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, o Options) {
}

if b.codecType == webrtc.RTPCodecTypeVideo {
b.packetQueue = NewPacketQueue(b.pool, 500)
for _, fb := range codec.RTCPFeedback {
switch fb.Type {
case webrtc.TypeRTCPFBGoogREMB:
Expand All @@ -156,7 +157,6 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, o Options) {
}
}
} else if b.codecType == webrtc.RTPCodecTypeAudio {
b.packetQueue = NewPacketQueue(b.pool, 50)
for _, h := range params.HeaderExtensions {
if h.URI == sdp.AudioLevelURI {
b.audioLevel = true
Expand Down Expand Up @@ -244,9 +244,14 @@ func (b *Buffer) Close() error {
defer b.Unlock()

b.closeOnce.Do(func() {
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeVideo {
b.videoPool.Put(b.bucket.buf)
}
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeAudio {
b.audioPool.Put(b.bucket.buf)
}
b.closed.set(true)
b.onClose()
b.packetQueue.Close()
})
return nil
}
Expand All @@ -261,7 +266,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) {
if b.stats.PacketCount == 0 {
b.baseSN = sn
b.maxSeqNo = sn
b.packetQueue.headSN = sn - 1
b.bucket.headSN = sn - 1
b.lastReport = arrivalTime
} else if (sn-b.maxSeqNo)&0x8000 == 0 {
if sn < b.maxSeqNo {
Expand Down Expand Up @@ -296,7 +301,12 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) {
b.stats.PacketCount++

var p rtp.Packet
if err := p.Unmarshal(b.packetQueue.AddPacket(pkt, sn, sn == b.maxSeqNo)); err != nil {
pb, err := b.bucket.AddPacket(pkt, sn, sn == b.maxSeqNo)
if err != nil {
log.Errorf("buffer write err: %v", err)
return
}
if err = p.Unmarshal(pb); err != nil {
return
}

Expand Down Expand Up @@ -493,7 +503,7 @@ func (b *Buffer) GetPacket(buff []byte, sn uint16) (int, error) {
if b.closed.get() {
return 0, io.EOF
}
return b.packetQueue.GetPacket(buff, sn)
return b.bucket.GetPacket(buff, sn)
}

// Bitrate returns the current publisher stream bitrate.
Expand Down
3 changes: 1 addition & 2 deletions pkg/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func TestNewBuffer(t *testing.T) {
name: "Must not be nil and add packets in sequence",
args: args{
options: Options{
BufferTime: 1000,
MaxBitRate: 1e6,
},
},
Expand Down Expand Up @@ -93,7 +92,7 @@ func TestNewBuffer(t *testing.T) {
return make([]byte, 1500)
},
}
buff := NewBuffer(123, pool)
buff := NewBuffer(123, pool, pool)
buff.codecType = webrtc.RTPCodecTypeVideo
assert.NotNil(t, buff)
assert.NotNil(t, TestPackets)
Expand Down
2 changes: 1 addition & 1 deletion pkg/buffer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import "errors"
var (
errPacketNotFound = errors.New("packet not found in cache")
errBufferTooSmall = errors.New("buffer too small")
errExtNotFound = errors.New("ext not found")
errPacketTooOld = errors.New("received packet too old")
)
16 changes: 11 additions & 5 deletions pkg/buffer/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@ import (

type Factory struct {
sync.RWMutex
packetPool *sync.Pool
videoPool *sync.Pool
audioPool *sync.Pool
rtpBuffers map[uint32]*Buffer
rtcpReaders map[uint32]*RTCPReader
}

func NewBufferFactory() *Factory {
func NewBufferFactory(trackingPackets int) *Factory {
return &Factory{
packetPool: &sync.Pool{
videoPool: &sync.Pool{
New: func() interface{} {
return make([]byte, 1500)
return make([]byte, trackingPackets*maxPktSize)
},
},
audioPool: &sync.Pool{
New: func() interface{} {
return make([]byte, maxPktSize*25)
},
},
rtpBuffers: make(map[uint32]*Buffer),
Expand Down Expand Up @@ -46,7 +52,7 @@ func (f *Factory) GetOrNew(packetType packetio.BufferPacketType, ssrc uint32) io
if reader, ok := f.rtpBuffers[ssrc]; ok {
return reader
}
buffer := NewBuffer(ssrc, f.packetPool)
buffer := NewBuffer(ssrc, f.videoPool, f.audioPool)
f.rtpBuffers[ssrc] = buffer
buffer.OnClose(func() {
f.Lock()
Expand Down
Loading

0 comments on commit f6149bf

Please sign in to comment.