Skip to content

Commit

Permalink
Merge pull request #275 from hashicorp/jm/intent-queue-metric
Browse files Browse the repository at this point in the history
Adding the regular emitting of the memberlist broadcast queue
  • Loading branch information
jmurret authored Aug 29, 2022
2 parents e3a4ff1 + 501c9b0 commit e892776
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ type Config struct {

// MetricLabels is a map of optional labels to apply to all metrics emitted.
MetricLabels []metrics.Label

// QueueCheckInterval is the interval at which we check the message
// queue to apply the warning and max depth.
QueueCheckInterval time.Duration
}

// ParseCIDRs return a possible empty list of all Network that have been parsed
Expand Down Expand Up @@ -322,6 +326,8 @@ func DefaultLANConfig() *Config {
HandoffQueueDepth: 1024,
UDPBufferSize: 1400,
CIDRsAllowed: nil, // same as allow all

QueueCheckInterval: 30 * time.Second,
}
}

Expand Down
15 changes: 15 additions & 0 deletions memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
go m.streamListen()
go m.packetListen()
go m.packetHandler()
go m.checkBroadcastQueueDepth()
return m, nil
}

Expand Down Expand Up @@ -776,3 +777,17 @@ func (m *Memberlist) changeNode(addr string, f func(*nodeState)) {
n := m.nodeMap[addr]
f(n)
}

// checkBroadcastQueueDepth periodically checks the size of the broadcast queue
// to see if it is too large
func (m *Memberlist) checkBroadcastQueueDepth() {
for {
select {
case <-time.After(m.config.QueueCheckInterval):
numq := m.broadcasts.NumQueued()
metrics.AddSampleWithLabels([]string{"memberlist", "queue", "broadcasts"}, float32(numq), m.metricLabels)
case <-m.shutdownCh:
return
}
}
}
22 changes: 22 additions & 0 deletions memberlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,28 @@ func TestCreate_secretKeyEmpty(t *testing.T) {
}
}

func TestCreate_checkBroadcastQueueMetrics(t *testing.T) {
sink := registerInMemorySink(t)
c := DefaultLANConfig()
c.QueueCheckInterval = 1 * time.Second
c.BindAddr = getBindAddr().String()
c.SecretKey = make([]byte, 0)

m, err := Create(c)
require.NoError(t, err)
defer m.Shutdown()

time.Sleep(3 * time.Second)

intv := getIntervalMetrics(t, sink)
sampleName := "consul.usage.test.memberlist.queue.broadcasts"
actualSample := intv.Samples[sampleName]

if actualSample.Count == 0 {
t.Fatalf("%s sample not taken", sampleName)
}
}

func TestCreate_keyringOnly(t *testing.T) {
c := DefaultLANConfig()
c.BindAddr = getBindAddr().String()
Expand Down
19 changes: 19 additions & 0 deletions state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/armon/go-metrics"
iretry "github.com/hashicorp/memberlist/internal/retry"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -2393,3 +2394,21 @@ func testVerifyProtocolSingle(t *testing.T, A [][6]uint8, B [][6]uint8, expect b
t.Fatalf("bad:\nA: %v\nB: %v\nErr: %s", A, B, err)
}
}

func registerInMemorySink(t *testing.T) *metrics.InmemSink {
t.Helper()
// Only have a single interval for the test
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
cfg := metrics.DefaultConfig("consul.usage.test")
cfg.EnableHostname = false
metrics.NewGlobal(cfg, sink)
return sink
}

func getIntervalMetrics(t *testing.T, sink *metrics.InmemSink) *metrics.IntervalMetrics {
t.Helper()
intervals := sink.Data()
require.Len(t, intervals, 1)
intv := intervals[0]
return intv
}

0 comments on commit e892776

Please sign in to comment.