Skip to content

Commit

Permalink
Fix deadlock in manager tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdurham committed Oct 8, 2024
1 parent 6f9a820 commit c78ea1d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
48 changes: 40 additions & 8 deletions internal/component/prometheus/remote/queue/network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package network

import (
"context"

"github.com/grafana/alloy/internal/runtime/logging/level"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component/prometheus/remote/queue/types"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/vladopajic/go-actor/actor"
)

Expand All @@ -17,13 +15,19 @@ type manager struct {
logger log.Logger
inbox actor.Mailbox[*types.TimeSeriesBinary]
metaInbox actor.Mailbox[*types.TimeSeriesBinary]
configInbox actor.Mailbox[types.ConnectionConfig]
configInbox actor.Mailbox[configCallback]
self actor.Actor
cfg types.ConnectionConfig
stats func(types.NetworkStats)
metaStats func(types.NetworkStats)
}

// configCallback allows the config to be synchronous.
type configCallback struct {
cc types.ConnectionConfig
ch chan struct{}
}

var _ types.NetworkClient = (*manager)(nil)

var _ actor.Worker = (*manager)(nil)
Expand All @@ -36,7 +40,7 @@ func New(cc types.ConnectionConfig, logger log.Logger, seriesStats, metadataStat
// it will stop the filequeue from feeding more.
inbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)),
metaInbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)),
configInbox: actor.NewMailbox[types.ConnectionConfig](),
configInbox: actor.NewMailbox[configCallback](),
stats: seriesStats,
metaStats: metadataStats,
cfg: cc,
Expand Down Expand Up @@ -72,39 +76,66 @@ func (s *manager) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary
}

func (s *manager) UpdateConfig(ctx context.Context, cc types.ConnectionConfig) error {
return s.configInbox.Send(ctx, cc)
ch := make(chan struct{})
defer close(ch)
err := s.configInbox.Send(ctx, configCallback{
cc: cc,
ch: ch,
})
if err != nil {
return err
}
<-ch
return nil
}

func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus {
// This acts as a priority queue, always check for configuration changes first.
select {
case cfg, ok := <-s.configInbox.ReceiveC():
if !ok {
level.Debug(s.logger).Log("msg", "config inbox closed")
return actor.WorkerEnd
}
s.updateConfig(cfg)
s.updateConfig(cfg.cc)
// Notify the caller we have applied the config.
cfg.ch <- struct{}{}
return actor.WorkerContinue
default:
}

// main work queue.
select {
case <-ctx.Done():
s.Stop()
return actor.WorkerEnd
case ts, ok := <-s.inbox.ReceiveC():
if !ok {
level.Debug(s.logger).Log("msg", "series inbox closed")
return actor.WorkerEnd
}
s.queue(ctx, ts)
return actor.WorkerContinue
case ts, ok := <-s.metaInbox.ReceiveC():
if !ok {
level.Debug(s.logger).Log("msg", "meta inbox closed")
return actor.WorkerEnd
}
err := s.metadata.seriesMbx.Send(ctx, ts)
if err != nil {
level.Error(s.logger).Log("msg", "failed to send to metadata loop", "err", err)
}
return actor.WorkerContinue
// We need to also check the config here, else its possible this will deadlock.
case cfg, ok := <-s.configInbox.ReceiveC():
if !ok {
level.Debug(s.logger).Log("msg", "config inbox closed")
return actor.WorkerEnd
}
s.updateConfig(cfg.cc)
// Notify the caller we have applied the config.
cfg.ch <- struct{}{}
return actor.WorkerContinue
}
}

Expand All @@ -124,13 +155,14 @@ func (s *manager) updateConfig(cc types.ConnectionConfig) {
for i := uint(0); i < s.cfg.Connections; i++ {
l := newLoop(cc, false, s.logger, s.stats)
l.self = actor.New(l)

s.loops = append(s.loops, l)
}

s.metadata = newLoop(cc, true, s.logger, s.metaStats)
s.metadata.self = actor.New(s.metadata)
level.Debug(s.logger).Log("msg", "starting loops")
s.startLoops()
level.Debug(s.logger).Log("msg", "loops started")
}

func (s *manager) Stop() {
Expand Down
19 changes: 10 additions & 9 deletions internal/component/prometheus/remote/queue/network/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"github.com/grafana/alloy/internal/util"
"io"
"math/rand"
"net/http"
Expand Down Expand Up @@ -64,33 +65,33 @@ func TestUpdatingConfig(t *testing.T) {
}))

defer svr.Close()
ctx := context.Background()
ctx, cncl := context.WithCancel(ctx)
defer cncl()

cc := types.ConnectionConfig{
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 10,
FlushFrequency: 1 * time.Second,
Connections: 4,
FlushFrequency: 5 * time.Second,
Connections: 1,
}

logger := log.NewNopLogger()
logger := util.TestAlloyLogger(t)

wr, err := New(cc, logger, func(s types.NetworkStats) {}, func(s types.NetworkStats) {})
require.NoError(t, err)
wr.Start()
defer wr.Stop()

cc2 := types.ConnectionConfig{
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 20,
FlushFrequency: 1 * time.Second,
FlushFrequency: 5 * time.Second,
Connections: 1,
}

err = wr.UpdateConfig(context.Background(), cc2)
ctx := context.Background()
err = wr.UpdateConfig(ctx, cc2)
require.NoError(t, err)
time.Sleep(1 * time.Second)
for i := 0; i < 100; i++ {
send(t, wr, ctx)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/component/prometheus/remote/queue/types/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type NetworkClient interface {
Stop()
SendSeries(ctx context.Context, d *TimeSeriesBinary) error
SendMetadata(ctx context.Context, d *TimeSeriesBinary) error
// UpdateConfig is a synchronous call and will only return once the config
// is applied or an error occurs.
UpdateConfig(ctx context.Context, cfg ConnectionConfig) error
}
type ConnectionConfig struct {
Expand Down

0 comments on commit c78ea1d

Please sign in to comment.