Skip to content

Commit

Permalink
Fix for config updating and tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdurham committed Sep 20, 2024
1 parent a5cf26d commit 9e84aee
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ lint: alloylint
# final command runs tests for all other submodules.
test:
$(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/)
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization ./internal/component/prometheus/remote/queue/network
$(GO_ENV) find . -name go.mod -not -path "./go.mod" -execdir go test -race ./... \;

test-packages:
Expand Down
24 changes: 0 additions & 24 deletions internal/component/prometheus/remote/queue/network/config.go

This file was deleted.

4 changes: 2 additions & 2 deletions internal/component/prometheus/remote/queue/network/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type loop struct {
client *http.Client
batchCount int
flushFrequency time.Duration
cfg ConnectionConfig
cfg types.ConnectionConfig
log log.Logger
lastSend time.Time
statsFunc func(s types.NetworkStats)
Expand All @@ -46,7 +46,7 @@ type loop struct {
sendBuffer []byte
}

func newLoop(cc ConnectionConfig, isMetaData bool, log log.Logger, stats func(s types.NetworkStats)) *loop {
func newLoop(cc types.ConnectionConfig, isMetaData bool, log log.Logger, stats func(s types.NetworkStats)) *loop {
// TODO @mattdurham add TLS support afer the initial push.
l := &loop{
isMeta: isMetaData,
Expand Down
61 changes: 35 additions & 26 deletions internal/component/prometheus/remote/queue/network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type manager struct {
logger log.Logger
inbox actor.Mailbox[*types.TimeSeriesBinary]
metaInbox actor.Mailbox[*types.TimeSeriesBinary]
configInbox actor.Mailbox[ConnectionConfig]
configInbox actor.Mailbox[types.ConnectionConfig]
self actor.Actor
cfg ConnectionConfig
cfg types.ConnectionConfig
stats func(types.NetworkStats)
metaStats func(types.NetworkStats)
}
Expand All @@ -29,17 +29,18 @@ var _ types.NetworkClient = (*manager)(nil)

var _ actor.Worker = (*manager)(nil)

func New(cc ConnectionConfig, logger log.Logger, seriesStats, metadataStats func(types.NetworkStats)) (types.NetworkClient, error) {
func New(cc types.ConnectionConfig, logger log.Logger, seriesStats, metadataStats func(types.NetworkStats)) (types.NetworkClient, error) {
s := &manager{
connectionCount: cc.Connections,
loops: make([]*loop, 0),
loops: make([]*loop, 0, cc.Connections),
logger: logger,
// This provides blocking to only handle one at a time, so that if a queue blocks
// it will stop the filequeue from feeding more.
inbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)),
metaInbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)),
configInbox: actor.NewMailbox[ConnectionConfig](),
configInbox: actor.NewMailbox[types.ConnectionConfig](),
stats: seriesStats,
cfg: cc,
}

// start kicks off a number of concurrent connections.
Expand All @@ -56,16 +57,11 @@ func New(cc ConnectionConfig, logger log.Logger, seriesStats, metadataStats func
}

func (s *manager) Start() {
actors := make([]actor.Actor, 0)
for _, l := range s.loops {
l.Start()
}
actors = append(actors, s.metadata.actors()...)
actors = append(actors, s.inbox)
actors = append(actors, s.metaInbox)
actors = append(actors, actor.New(s))
actors = append(actors, s.configInbox)
s.self = actor.Combine(actors...).Build()
s.startLoops()
s.configInbox.Start()
s.metaInbox.Start()
s.inbox.Start()
s.self = actor.New(s)
s.self.Start()
}

Expand All @@ -77,7 +73,7 @@ func (s *manager) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary
return s.metaInbox.Send(ctx, data)
}

func (s *manager) UpdateConfig(ctx context.Context, cc ConnectionConfig) error {
func (s *manager) UpdateConfig(ctx context.Context, cc types.ConnectionConfig) error {
return s.configInbox.Send(ctx, cc)
}

Expand Down Expand Up @@ -114,40 +110,53 @@ func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus {
}
}

func (s *manager) updateConfig(cc ConnectionConfig) {
func (s *manager) updateConfig(cc types.ConnectionConfig) {
// No need to do anything if the configuration is the same.
if s.cfg.Equals(cc) {
return
}
s.cfg = cc
// TODO @mattdurham make this smarter, at the moment any samples in the loops are lost.
// Ideally we would drain the queues and re add them but that is a future need.
// In practice this shouldn't change often so data loss should be minimal.
// For the moment we will stop all the items and recreate them.
for _, l := range s.loops {
l.Stop()
}
s.metadata.Stop()

s.loops = make([]*loop, 0)
s.stopLoops()
s.loops = make([]*loop, 0, s.connectionCount)
var i uint64
for ; i < s.connectionCount; i++ {
l := newLoop(cc, false, s.logger, s.stats)
l.self = actor.New(l)

s.loops = append(s.loops, l)
}

s.metadata = newLoop(cc, true, s.logger, s.metaStats)
s.metadata.self = actor.New(s.metadata)
s.startLoops()
}

func (s *manager) Stop() {
level.Debug(s.logger).Log("msg", "stopping manager")
s.stopLoops()
s.configInbox.Stop()
s.metaInbox.Stop()
s.inbox.Stop()
s.self.Stop()
}

func (s *manager) stopLoops() {
for _, l := range s.loops {
l.Stop()
l.stopCalled.Store(true)
l.Stop()
}
s.metadata.stopCalled.Store(true)
s.self.Stop()
s.metadata.Stop()
}

func (s *manager) startLoops() {
for _, l := range s.loops {
l.Start()
}
s.metadata.Start()
}

// Queue adds anything thats not metadata to the queue.
Expand Down
Loading

0 comments on commit 9e84aee

Please sign in to comment.