Skip to content

Commit

Permalink
DC messages (#158)
Browse files Browse the repository at this point in the history
* Implement data channel message type

* Update client implementation

* Update godeltaprof

* Fix dc init race

* [MM-60561] RTC client metrics (#159)

* Client metrics

* Implement rtc stats monitor

* Remove unnecessary parenthesis
  • Loading branch information
streamer45 authored Oct 9, 2024
1 parent 3c0ace4 commit 1738e66
Show file tree
Hide file tree
Showing 14 changed files with 769 additions and 46 deletions.
7 changes: 6 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ type Client struct {

// WebRTC
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
dc atomic.Pointer[webrtc.DataChannel]
iceCh chan webrtc.ICECandidateInit
receivers map[string][]*webrtc.RTPReceiver
voiceSender *webrtc.RTPSender
screenTransceivers []*webrtc.RTPTransceiver
rtcMon *rtcMonitor

state int32

Expand Down Expand Up @@ -225,6 +226,10 @@ func (c *Client) emit(eventType EventType, ctx any) {
func (c *Client) close() {
atomic.StoreInt32(&c.state, clientStateClosed)

if c.rtcMon != nil {
c.rtcMon.Stop()
}

if c.pc != nil {
if err := c.pc.Close(); err != nil {
c.log.Error("failed to close peer connection", slog.String("err", err.Error()))
Expand Down
2 changes: 2 additions & 0 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Config struct {
// EnableDCSignaling controls whether the client should use data channels
// for signaling of media tracks.
EnableDCSignaling bool
// EnableRTCMonitor controls whether the RTC monitor component should be enabled.
EnableRTCMonitor bool

wsURL string
}
Expand Down
14 changes: 8 additions & 6 deletions client/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,17 +357,19 @@ func setupTestHelper(tb testing.TB, channelName string) *TestHelper {
}

th.adminClient, err = New(Config{
SiteURL: th.apiURL,
AuthToken: th.adminAPIClient.AuthToken,
ChannelID: channelID,
SiteURL: th.apiURL,
AuthToken: th.adminAPIClient.AuthToken,
ChannelID: channelID,
EnableRTCMonitor: true,
}, WithLogger(logger))
require.NoError(tb, err)
require.NotNil(tb, th.adminClient)

th.userClient, err = New(Config{
SiteURL: th.apiURL,
AuthToken: th.userAPIClient.AuthToken,
ChannelID: channelID,
SiteURL: th.apiURL,
AuthToken: th.userAPIClient.AuthToken,
ChannelID: channelID,
EnableRTCMonitor: true,
}, WithLogger(logger))
require.NoError(tb, err)
require.NotNil(tb, th.userClient)
Expand Down
164 changes: 140 additions & 24 deletions client/rtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ import (
"io"
"log/slog"
"sync/atomic"
"time"

"github.com/mattermost/rtcd/service/rtc/dc"

"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/stats"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
)
Expand All @@ -23,8 +27,10 @@ const (
signalMsgOffer = "offer"
signalMsgAnswer = "answer"

iceChSize = 20
receiveMTU = 1460
iceChSize = 20
receiveMTU = 1460
rtcMonitorInterval = 4 * time.Second
pingInterval = time.Second
)

var (
Expand Down Expand Up @@ -140,13 +146,19 @@ func (c *Client) handleOffer(sdp string) error {
return fmt.Errorf("failed to set local description: %w", err)
}

if c.cfg.EnableDCSignaling && c.dc.ReadyState() == webrtc.DataChannelStateOpen {
if dataCh := c.dc.Load(); c.cfg.EnableDCSignaling && dataCh != nil && dataCh.ReadyState() == webrtc.DataChannelStateOpen {
c.log.Debug("sending answer through dc")
data, err := json.Marshal(answer)
if err != nil {
return fmt.Errorf("failed to encode answer: %w", err)
return fmt.Errorf("failed to marshal answer: %w", err)
}

msg, err := dc.EncodeMessage(dc.MessageTypeSDP, data)
if err != nil {
return fmt.Errorf("failed to encode dc message: %w", err)
}
return c.dc.SendText(string(data))

return dataCh.Send(msg)
}

if c.cfg.EnableDCSignaling {
Expand Down Expand Up @@ -178,6 +190,17 @@ func (c *Client) initRTCSession() error {
}

i := interceptor.Registry{}

statsInterceptorFactory, err := stats.NewInterceptor()
if err != nil {
return fmt.Errorf("failed to create stats interceptor: %w", err)
}
var statsGetter stats.Getter
statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
statsGetter = g
})
i.Add(statsInterceptorFactory)

if err := webrtc.RegisterDefaultInterceptors(&m, &i); err != nil {
return fmt.Errorf("failed to register default interceptors: %w", err)
}
Expand All @@ -198,11 +221,13 @@ func (c *Client) initRTCSession() error {
c.pc = pc
c.mut.Unlock()

dc, err := pc.CreateDataChannel("calls-dc", nil)
if err != nil {
return fmt.Errorf("failed to create data channel: %w", err)
rtcMon := newRTCMonitor(c.log, pc, statsGetter, rtcMonitorInterval)
if c.cfg.EnableRTCMonitor {
c.mut.Lock()
c.rtcMon = rtcMon
c.mut.Unlock()
rtcMon.Start()
}
c.dc = dc

pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
Expand Down Expand Up @@ -324,14 +349,21 @@ func (c *Client) initRTCSession() error {
return
}

if c.cfg.EnableDCSignaling && c.dc.ReadyState() == webrtc.DataChannelStateOpen {
if dataCh := c.dc.Load(); c.cfg.EnableDCSignaling && dataCh != nil && dataCh.ReadyState() == webrtc.DataChannelStateOpen {
c.log.Debug("sending offer through dc")
data, err := json.Marshal(offer)
if err != nil {
c.log.Error("failed to encode offer", slog.String("err", err.Error()))
c.log.Error("failed to marshal offer", slog.String("err", err.Error()))
return
}

msg, err := dc.EncodeMessage(dc.MessageTypeSDP, data)
if err != nil {
c.log.Error("failed to encode dc message", slog.String("err", err.Error()))
return
}
if err := c.dc.SendText(string(data)); err != nil {

if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send on dc", slog.String("err", err.Error()))
}
} else {
Expand All @@ -357,23 +389,107 @@ func (c *Client) initRTCSession() error {
}
})

dc.OnMessage(func(msg webrtc.DataChannelMessage) {
var sdp webrtc.SessionDescription
if err := json.Unmarshal(msg.Data, &sdp); err != nil {
c.log.Error("failed to unmarshal sdp", slog.String("err", err.Error()))
return
// DC creation must happen after OnNegotiationNeeded has been registered
// to avoid races between dc initialization and initial offer.
dataCh, err := pc.CreateDataChannel("calls-dc", nil)
if err != nil {
return fmt.Errorf("failed to create data channel: %w", err)
}
c.dc.Store(dataCh)

lastPingTS := new(int64)
lastRTT := new(int64)
go func() {
pingTicker := time.NewTicker(pingInterval)
for {
select {
case <-pingTicker.C:
msg, err := dc.EncodeMessage(dc.MessageTypePing, nil)
if err != nil {
c.log.Error("failed to encode ping msg", slog.String("err", err.Error()))
continue
}

if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send ping msg", slog.String("err", err.Error()))
continue
}

atomic.StoreInt64(lastPingTS, time.Now().UnixMilli())
case stats := <-rtcMon.StatsCh():
c.log.Debug("rtc stats",
slog.Float64("lossRate", stats.lossRate),
slog.Int64("rtt", atomic.LoadInt64(lastRTT)),
slog.Float64("jitter", stats.jitter))

if stats.lossRate >= 0 {
msg, err := dc.EncodeMessage(dc.MessageTypeLossRate, stats.lossRate)
if err != nil {
c.log.Error("failed to encode loss rate msg", slog.String("err", err.Error()))
} else {
if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send loss rate msg", slog.String("err", err.Error()))
}
}
}

if rtt := atomic.LoadInt64(lastRTT); rtt > 0 {
msg, err := dc.EncodeMessage(dc.MessageTypeRoundTripTime, float64(rtt/1000))
if err != nil {
c.log.Error("failed to encode rtt msg", slog.String("err", err.Error()))
} else {
if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send rtt msg", slog.String("err", err.Error()))
}
}
}

if stats.jitter > 0 {
msg, err := dc.EncodeMessage(dc.MessageTypeJitter, stats.jitter)
if err != nil {
c.log.Error("failed to encode jitter msg", slog.String("err", err.Error()))
} else {
if err := dataCh.Send(msg); err != nil {
c.log.Error("failed to send jitter msg", slog.String("err", err.Error()))
}
}
}
case <-c.wsCloseCh:
return
}
}
}()

c.log.Debug("received sdp through DC", slog.String("sdp", sdp.SDP))
dataCh.OnMessage(func(msg webrtc.DataChannelMessage) {
mt, payload, err := dc.DecodeMessage(msg.Data)
if err != nil {
c.log.Error("failed to decode dc message", slog.String("err", err.Error()))
return
}

if sdp.Type == webrtc.SDPTypeOffer {
if err := c.handleOffer(sdp.SDP); err != nil {
c.log.Error("failed to offer", slog.String("err", err.Error()))
switch mt {
case dc.MessageTypePong:
if ts := atomic.LoadInt64(lastPingTS); ts > 0 {
atomic.StoreInt64(lastRTT, time.Now().UnixMilli()-ts)
}
case dc.MessageTypeSDP:
var sdp webrtc.SessionDescription
if err := json.Unmarshal(payload.([]byte), &sdp); err != nil {
c.log.Error("failed to unmarshal sdp", slog.String("err", err.Error()))
return
}
} else if sdp.Type == webrtc.SDPTypeAnswer {
if err := c.handleAnswer(sdp.SDP); err != nil {
c.log.Error("failed to answer", slog.String("err", err.Error()))
c.log.Debug("received sdp through DC", slog.String("sdp", sdp.SDP))
if sdp.Type == webrtc.SDPTypeOffer {
if err := c.handleOffer(sdp.SDP); err != nil {
c.log.Error("failed to offer", slog.String("err", err.Error()))
}
} else if sdp.Type == webrtc.SDPTypeAnswer {
if err := c.handleAnswer(sdp.SDP); err != nil {
c.log.Error("failed to answer", slog.String("err", err.Error()))
}
}
default:
c.log.Error("unexpected dc message type", slog.Any("mt", mt))
}
})

Expand Down
Loading

0 comments on commit 1738e66

Please sign in to comment.