Skip to content

Commit

Permalink
Add forward stats (livekit#2725)
Browse files Browse the repository at this point in the history
* Add forward metrics

* ignore packets was not forwarded

* rename
  • Loading branch information
cnderrauber authored May 24, 2024
1 parent a444f24 commit e6aa36f
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 32 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e
github.com/livekit/protocol v1.16.1-0.20240523171447-90dfd668f42f
github.com/livekit/protocol v1.16.1-0.20240524061435-6410d008bf7b
github.com/livekit/psrpc v0.5.3-0.20240426045048-8ba067a45715
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
Expand Down Expand Up @@ -66,7 +66,7 @@ require (
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/subcommands v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
Expand All @@ -80,7 +80,7 @@ require (
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mdlayher/netlink v1.7.1 // indirect
github.com/mdlayher/socket v0.4.0 // indirect
github.com/nats-io/nats.go v1.34.1 // indirect
github.com/nats-io/nats.go v1.35.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/datachannel v1.5.5 // indirect
Expand All @@ -105,7 +105,7 @@ require (
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect
google.golang.org/grpc v1.64.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
Expand Down Expand Up @@ -120,8 +120,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e h1:ss4VwrouYiDpuNJ9BUTH+WsW+GDdJS70iZp8ii3/0Lc=
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.16.1-0.20240523171447-90dfd668f42f h1:T4Twu81WbBQ7HyJWv3j/D8VJ//7M4yo4s3EmdPJqcGY=
github.com/livekit/protocol v1.16.1-0.20240523171447-90dfd668f42f/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/protocol v1.16.1-0.20240524061435-6410d008bf7b h1:TQOoMQqruWwgbhMoxY2ZCksge88NesmWSn8eBZuWKFs=
github.com/livekit/protocol v1.16.1-0.20240524061435-6410d008bf7b/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/psrpc v0.5.3-0.20240426045048-8ba067a45715 h1:vhDMOe8fxEc/amYTFo799LySPM12Fk3vc+Nc6o4gYZQ=
github.com/livekit/psrpc v0.5.3-0.20240426045048-8ba067a45715/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
Expand Down Expand Up @@ -153,8 +153,8 @@ github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw
github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk=
github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down Expand Up @@ -402,10 +402,10 @@ golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e h1:Elxv5MwEkCI9f5SkoL6afed6NTdxaGoAo39eANBwHL8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type RTCConfig struct {

// max number of bytes to buffer for data channel. 0 means unlimited
DataChannelMaxBufferedAmount uint64 `yaml:"data_channel_max_buffered_amount,omitempty"`

ForwardStats ForwardStatsConfig `yaml:"forward_stats,omitempty"`
}

type TURNServer struct {
Expand Down Expand Up @@ -318,6 +320,12 @@ type APIConfig struct {
MaxCheckInterval time.Duration `yaml:"max_check_interval,omitempty"`
}

type ForwardStatsConfig struct {
SummaryInterval time.Duration `yaml:"summary_interval,omitempty"`
ReportInterval time.Duration `yaml:"report_interval,omitempty"`
ReportWindow time.Duration `yaml:"report_window,omitempty"`
}

func DefaultAPIConfig() APIConfig {
return APIConfig{
ExecutionTimeout: 2 * time.Second,
Expand Down
2 changes: 2 additions & 0 deletions pkg/rtc/mediatrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type MediaTrackParams struct {
Logger logger.Logger
SimTracks map[uint32]SimulcastTrackInfo
OnRTCP func([]rtcp.Packet)
ForwardStats *sfu.ForwardStats
}

func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack {
Expand Down Expand Up @@ -281,6 +282,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
sfu.WithAudioConfig(t.params.AudioConfig),
sfu.WithLoadBalanceThreshold(20),
sfu.WithStreamTrackers(),
sfu.WithForwardStats(t.params.ForwardStats),
)
newWR.OnCloseHandler(func() {
t.MediaTrackReceiver.SetClosing()
Expand Down
2 changes: 2 additions & 0 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type ParticipantParams struct {
PlayoutDelay *livekit.PlayoutDelay
SyncStreams bool
EnableTrafficLoadTracking bool
ForwardStats *sfu.ForwardStats
}

type ParticipantImpl struct {
Expand Down Expand Up @@ -2126,6 +2127,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
PLIThrottleConfig: p.params.PLIThrottleConfig,
SimTracks: p.params.SimTracks,
OnRTCP: p.postRtcp,
ForwardStats: p.params.ForwardStats,
}, ti)

mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange)
Expand Down
10 changes: 10 additions & 0 deletions pkg/service/roommanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/exp/maps"

"github.com/livekit/livekit-server/pkg/agent"
"github.com/livekit/livekit-server/pkg/sfu"
sutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/auth"
Expand Down Expand Up @@ -83,6 +84,8 @@ type RoomManager struct {
participantServers utils.MultitonService[rpc.ParticipantTopic]

iceConfigCache *sutils.IceConfigCache[iceConfigCacheKey]

forwardStats *sfu.ForwardStats
}

func NewLocalRoomManager(
Expand All @@ -97,6 +100,7 @@ func NewLocalRoomManager(
versionGenerator utils.TimedVersionGenerator,
turnAuthHandler *TURNAuthHandler,
bus psrpc.MessageBus,
forwardStats *sfu.ForwardStats,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf)
if err != nil {
Expand All @@ -116,6 +120,7 @@ func NewLocalRoomManager(
versionGenerator: versionGenerator,
turnAuthHandler: turnAuthHandler,
bus: bus,
forwardStats: forwardStats,

rooms: make(map[livekit.RoomName]*rtc.Room),

Expand Down Expand Up @@ -232,6 +237,10 @@ func (r *RoomManager) Stop() {
}

r.iceConfigCache.Stop()

if r.forwardStats != nil {
r.forwardStats.Stop()
}
}

// StartSession starts WebRTC session when a new participant is connected, takes place on RTC node
Expand Down Expand Up @@ -440,6 +449,7 @@ func (r *RoomManager) StartSession(
SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo,
PlayoutDelay: roomInternal.GetPlayoutDelay(),
SyncStreams: roomInternal.GetSyncStreams(),
ForwardStats: r.forwardStats,
})
if err != nil {
return err
Expand Down
11 changes: 10 additions & 1 deletion pkg/service/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"github.com/redis/go-redis/v9"
"gopkg.in/yaml.v3"

"github.com/livekit/livekit-server/pkg/agent"
"github.com/livekit/livekit-server/pkg/clientconfiguration"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/agent"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
Expand All @@ -51,6 +52,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
createKeyProvider,
createWebhookNotifier,
createClientConfiguration,
createForwardStats,
routing.CreateRouter,
getRoomConf,
config.DefaultAPIConfig,
Expand Down Expand Up @@ -235,6 +237,13 @@ func getPSRPCClientParams(config rpc.PSRPCConfig, bus psrpc.MessageBus) rpc.Clie
return rpc.NewClientParams(config, bus, logger.GetLogger(), rpc.PSRPCMetricsObserver{})
}

func createForwardStats(conf *config.Config) *sfu.ForwardStats {
if conf.RTC.ForwardStats.SummaryInterval == 0 || conf.RTC.ForwardStats.ReportInterval == 0 || conf.RTC.ForwardStats.ReportWindow == 0 {
return nil
}
return sfu.NewForwardStats(conf.RTC.ForwardStats.SummaryInterval, conf.RTC.ForwardStats.ReportInterval, conf.RTC.ForwardStats.ReportWindow)
}

func newInProcessTurnServer(conf *config.Config, authHandler turn.AuthHandler) (*turn.Server, error) {
return NewTurnServer(conf, authHandler, false)
}
11 changes: 10 additions & 1 deletion pkg/service/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion pkg/sfu/downtrackspreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ func (d *DownTrackSpreader) HasDownTrack(subscriberID livekit.ParticipantID) boo
return ok
}

func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) {
func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) int {
downTracks := d.GetDownTracks()
if len(downTracks) == 0 {
return 0
}

threshold := uint64(d.params.Threshold)
if threshold == 0 {
threshold = 1000000
Expand All @@ -97,6 +101,7 @@ func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) {
// WriteRTP takes about 50µs on average, so we write to 2 down tracks per loop.
step := uint64(2)
utils.ParallelExec(downTracks, threshold, step, writer)
return len(downTracks)
}

func (d *DownTrackSpreader) DownTrackCount() int {
Expand Down
74 changes: 74 additions & 0 deletions pkg/sfu/forwardstats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package sfu

import (
"sync"
"sync/atomic"
"time"

"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/utils"
)

type ForwardStats struct {
lock sync.Mutex
lastLeftMs atomic.Int64
latency *utils.LatencyAggregate
closeCh chan struct{}
}

func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength time.Duration) *ForwardStats {
s := &ForwardStats{
latency: utils.NewLatencyAggregate(latencyUpdateInterval, latencyWindowLength),
closeCh: make(chan struct{}),
}

go s.report(reportInterval)
return s
}

func (s *ForwardStats) Update(arrival, left time.Time) {
leftMs := left.UnixMilli()
lastMs := s.lastLeftMs.Load()
if leftMs < lastMs || !s.lastLeftMs.CompareAndSwap(lastMs, leftMs) {
return
}

transit := left.Sub(arrival)
s.lock.Lock()
defer s.lock.Unlock()
s.latency.Update(time.Duration(arrival.UnixNano()), float64(transit))
}

func (s *ForwardStats) GetStats() (latency, jitter time.Duration) {
s.lock.Lock()
defer s.lock.Unlock()
w := s.latency.Summarize()
return time.Duration(w.Mean()), time.Duration(w.StdDev())
}

func (s *ForwardStats) GetLastStats(duration time.Duration) (latency, jitter time.Duration) {
s.lock.Lock()
defer s.lock.Unlock()
w := s.latency.SummarizeLast(duration)
return time.Duration(w.Mean()), time.Duration(w.StdDev())
}

func (s *ForwardStats) Stop() {
close(s.closeCh)
}

func (s *ForwardStats) report(reportInterval time.Duration) {
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()
for {
select {
case <-s.closeCh:
return
case <-ticker.C:
latency, jitter := s.GetLastStats(reportInterval)
latencySlow, jitterSlow := s.GetStats()
prometheus.RecordForwardJitter(uint32(jitter/time.Millisecond), uint32(jitterSlow/time.Millisecond))
prometheus.RecordForwardLatency(uint32(latency/time.Millisecond), uint32(latencySlow/time.Millisecond))
}
}
}
Loading

0 comments on commit e6aa36f

Please sign in to comment.