Skip to content

Commit

Permalink
Implement rtc stats monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 committed Oct 7, 2024
1 parent e1c9f96 commit c2b2c0b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 2 deletions.
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type Client struct {
receivers map[string][]*webrtc.RTPReceiver
voiceSender *webrtc.RTPSender
screenTransceivers []*webrtc.RTPTransceiver
rtcMon *rtcMonitor

Check failure on line 107 in client/client.go

View workflow job for this annotation

GitHub Actions / lint

undefined: rtcMonitor

state int32

Expand Down Expand Up @@ -225,6 +226,10 @@ func (c *Client) emit(eventType EventType, ctx any) {
func (c *Client) close() {
atomic.StoreInt32(&c.state, clientStateClosed)

if c.rtcMon != nil {
c.rtcMon.Stop()
}

if c.pc != nil {
if err := c.pc.Close(); err != nil {
c.log.Error("failed to close peer connection", slog.String("err", err.Error()))
Expand Down
87 changes: 85 additions & 2 deletions client/rtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"io"
"log/slog"
"sync/atomic"
"time"

"github.com/mattermost/rtcd/service/rtc/dc"

"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/stats"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
)
Expand All @@ -25,8 +27,10 @@ const (
signalMsgOffer = "offer"
signalMsgAnswer = "answer"

iceChSize = 20
receiveMTU = 1460
iceChSize = 20
receiveMTU = 1460
rtcMonitorInterval = 4 * time.Second
pingInterval = time.Second
)

var (
Expand Down Expand Up @@ -186,6 +190,17 @@ func (c *Client) initRTCSession() error {
}

i := interceptor.Registry{}

statsInterceptorFactory, err := stats.NewInterceptor()
if err != nil {
return fmt.Errorf("failed to create stats interceptor: %w", err)
}
var statsGetter stats.Getter
statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
statsGetter = g
})
i.Add(statsInterceptorFactory)

if err := webrtc.RegisterDefaultInterceptors(&m, &i); err != nil {
return fmt.Errorf("failed to register default interceptors: %w", err)
}
Expand All @@ -206,6 +221,13 @@ func (c *Client) initRTCSession() error {
c.pc = pc
c.mut.Unlock()

rtcMon := newRTCMonitor(c.log, pc, statsGetter, rtcMonitorInterval)

Check failure on line 224 in client/rtc.go

View workflow job for this annotation

GitHub Actions / lint

undefined: newRTCMonitor (typecheck)
rtcMon.Start()

c.mut.Lock()
c.rtcMon = rtcMon
c.mut.Unlock()

pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
c.log.Debug("local ICE gathering completed")
Expand Down Expand Up @@ -374,6 +396,64 @@ func (c *Client) initRTCSession() error {
}
c.dc.Store(dataCh)

lastPingTS := new(int64)
lastRTT := new(int64)
go func() {
pingTicker := time.NewTicker(pingInterval)
for {
select {
case <-pingTicker.C:
msg, err := dc.EncodeMessage(dc.MessageTypePing, nil)
if err != nil {
c.log.Error("failed to encode ping msg", slog.String("err", err.Error()))
continue
}

if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send ping msg", slog.String("err", err.Error()))
continue
}

atomic.StoreInt64(lastPingTS, time.Now().UnixMilli())
case stats := <-rtcMon.StatsCh():
if stats.lossRate >= 0 {
msg, err := dc.EncodeMessage(dc.MessageTypeLossRate, stats.lossRate)
if err != nil {
c.log.Error("failed to encode loss rate msg", slog.String("err", err.Error()))
} else {
if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send loss rate msg", slog.String("err", err.Error()))
}
}
}

if rtt := atomic.LoadInt64(lastRTT); rtt > 0 {
msg, err := dc.EncodeMessage(dc.MessageTypeRoundTripTime, rtt)
if err != nil {
c.log.Error("failed to encode rtt msg", slog.String("err", err.Error()))
} else {
if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send rtt msg", slog.String("err", err.Error()))
}
}
}

if stats.jitter > 0 {
msg, err := dc.EncodeMessage(dc.MessageTypeJitter, stats.jitter)
if err != nil {
c.log.Error("failed to encode jitter msg", slog.String("err", err.Error()))
} else {
if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send jitter msg", slog.String("err", err.Error()))
}
}
}
case <-c.wsCloseCh:
return
}
}
}()

dataCh.OnMessage(func(msg webrtc.DataChannelMessage) {
mt, payload, err := dc.DecodeMessage(msg.Data)
if err != nil {
Expand All @@ -383,6 +463,9 @@ func (c *Client) initRTCSession() error {

switch mt {
case dc.MessageTypePong:
if ts := atomic.LoadInt64(lastPingTS); ts > 0 {
atomic.StoreInt64(lastRTT, time.Now().UnixMilli()-ts)
}
case dc.MessageTypeSDP:
var sdp webrtc.SessionDescription
if err := json.Unmarshal(payload.([]byte), &sdp); err != nil {
Expand Down

0 comments on commit c2b2c0b

Please sign in to comment.