Skip to content

Commit

Permalink
Fix race conditions in unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdurham committed Oct 8, 2024
1 parent 98bc887 commit 2e32ce6
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 29 deletions.
14 changes: 14 additions & 0 deletions internal/component/prometheus/remote/queue/e2e_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const alloyMetadataFailed = "alloy_queue_metadata_network_failed"
const alloyMetadataRetried429 = "alloy_queue_metadata_network_retried_429"
const alloyMetadataRetried = "alloy_queue_metadata_network_retried"

const alloyNetworkTimestamp = "alloy_queue_series_network_timestamp_seconds"

// TestMetadata is the large end to end testing for the queue based wal, specifically for metadata.
func TestMetadata(t *testing.T) {
// Check assumes you are checking for any value that is not 0.
Expand Down Expand Up @@ -187,6 +189,10 @@ func TestMetrics(t *testing.T) {
name: inTimestamp,
valueFunc: isReasonableTimeStamp,
},
{
name: alloyNetworkTimestamp,
valueFunc: greaterThenZero,
},
},
},
{
Expand Down Expand Up @@ -308,6 +314,10 @@ func TestMetrics(t *testing.T) {
name: inTimestamp,
valueFunc: isReasonableTimeStamp,
},
{
name: alloyNetworkTimestamp,
valueFunc: greaterThenZero,
},
},
},
{
Expand Down Expand Up @@ -429,6 +439,10 @@ func TestMetrics(t *testing.T) {
name: inTimestamp,
valueFunc: isReasonableTimeStamp,
},
{
name: alloyNetworkTimestamp,
valueFunc: greaterThenZero,
},
},
},
{
Expand Down
118 changes: 89 additions & 29 deletions internal/component/prometheus/remote/queue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func TestE2E(t *testing.T) {
type e2eTest struct {
name string
maker func(index int, app storage.Appender) (float64, labels.Labels)
tester func(samples []prompb.TimeSeries)
testMeta func(samples []prompb.MetricMetadata)
tester func(samples *safeSlice[prompb.TimeSeries])
testMeta func(samples *safeSlice[prompb.MetricMetadata])
}
tests := []e2eTest{
{
Expand All @@ -44,9 +44,10 @@ func TestE2E(t *testing.T) {
require.NoError(t, errApp)
return v, lbls
},
tester: func(samples []prompb.TimeSeries) {
tester: func(samples *safeSlice[prompb.TimeSeries]) {
t.Helper()
for _, s := range samples {
for i := 0; i < samples.Len(); i++ {
s := samples.Get(i)
require.True(t, len(s.Samples) == 1)
require.True(t, s.Samples[0].Timestamp > 0)
require.True(t, s.Samples[0].Value > 0)
Expand All @@ -64,8 +65,9 @@ func TestE2E(t *testing.T) {
require.NoError(t, errApp)
return 0, lbls
},
testMeta: func(samples []prompb.MetricMetadata) {
for _, s := range samples {
testMeta: func(samples *safeSlice[prompb.MetricMetadata]) {
for i := 0; i < samples.Len(); i++ {
s := samples.Get(i)
require.True(t, s.GetUnit() == "seconds")
require.True(t, s.Help == "metadata help")
require.True(t, s.Unit == "seconds")
Expand All @@ -83,9 +85,10 @@ func TestE2E(t *testing.T) {
require.NoError(t, errApp)
return h.Sum, lbls
},
tester: func(samples []prompb.TimeSeries) {
tester: func(samples *safeSlice[prompb.TimeSeries]) {
t.Helper()
for _, s := range samples {
for i := 0; i < samples.Len(); i++ {
s := samples.Get(i)
require.True(t, len(s.Samples) == 1)
require.True(t, s.Samples[0].Timestamp > 0)
require.True(t, s.Samples[0].Value == 0)
Expand All @@ -102,9 +105,10 @@ func TestE2E(t *testing.T) {
require.NoError(t, errApp)
return h.Sum, lbls
},
tester: func(samples []prompb.TimeSeries) {
tester: func(samples *safeSlice[prompb.TimeSeries]) {
t.Helper()
for _, s := range samples {
for i := 0; i < samples.Len(); i++ {
s := samples.Get(i)
require.True(t, len(s.Samples) == 1)
require.True(t, s.Samples[0].Timestamp > 0)
require.True(t, s.Samples[0].Value == 0)
Expand All @@ -122,23 +126,23 @@ func TestE2E(t *testing.T) {
}

const (
iterations = 100
iterations = 10
items = 10_000
)

func runTest(t *testing.T, add func(index int, appendable storage.Appender) (float64, labels.Labels), test func(samples []prompb.TimeSeries), metaTest func(meta []prompb.MetricMetadata)) {
func runTest(t *testing.T, add func(index int, appendable storage.Appender) (float64, labels.Labels), test func(samples *safeSlice[prompb.TimeSeries]), metaTest func(meta *safeSlice[prompb.MetricMetadata])) {
l := util.TestAlloyLogger(t)
done := make(chan struct{})
var series atomic.Int32
var meta atomic.Int32
samples := make([]prompb.TimeSeries, 0)
metaSamples := make([]prompb.MetricMetadata, 0)
samples := newSafeSlice[prompb.TimeSeries]()
metaSamples := newSafeSlice[prompb.MetricMetadata]()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
newSamples, newMetadata := handlePost(t, w, r)
series.Add(int32(len(newSamples)))
meta.Add(int32(len(newMetadata)))
samples = append(samples, newSamples...)
metaSamples = append(metaSamples, newMetadata...)
samples.AddSlice(newSamples)
metaSamples.AddSlice(newMetadata)
if series.Load() == iterations*items {
done <- struct{}{}
}
Expand All @@ -158,40 +162,43 @@ func runTest(t *testing.T, add func(index int, appendable storage.Appender) (flo
// Wait for export to spin up.
exp := <-expCh

index := 0
results := make(map[float64]labels.Labels)
mut := sync.Mutex{}
index := atomic.NewInt64(0)
results := &safeMap{
results: make(map[float64]labels.Labels),
}

for i := 0; i < iterations; i++ {
go func() {
app := exp.Receiver.Appender(ctx)
for j := 0; j < items; j++ {
index++
v, lbl := add(index, app)
mut.Lock()
results[v] = lbl
mut.Unlock()
val := index.Add(1)
v, lbl := add(int(val), app)
results.Add(v, lbl)
}
require.NoError(t, app.Commit())
}()
}
// This is a weird use case to handle eventually.
tm := time.NewTimer(15 * time.Second)
// With race turned on this can take a long time.
tm := time.NewTimer(20 * time.Second)
select {
case <-done:
case <-tm.C:
require.Truef(t, false, "failed to collect signals in the appropriate time")
}
cancel()
for _, s := range samples {

for i := 0; i < samples.Len(); i++ {
s := samples.Get(i)
if len(s.Histograms) == 1 {
lbls, ok := results[s.Histograms[0].Sum]
lbls, ok := results.Get(s.Histograms[0].Sum)
require.True(t, ok)
for i, sLbl := range s.Labels {
require.True(t, lbls[i].Name == sLbl.Name)
require.True(t, lbls[i].Value == sLbl.Value)
}
} else {
lbls, ok := results[s.Samples[0].Value]
lbls, ok := results.Get(s.Samples[0].Value)
require.True(t, ok)
for i, sLbl := range s.Labels {
require.True(t, lbls[i].Name == sLbl.Name)
Expand All @@ -204,7 +211,9 @@ func runTest(t *testing.T, add func(index int, appendable storage.Appender) (flo
} else {
metaTest(metaSamples)
}
require.Truef(t, types.OutStandingTimeSeriesBinary.Load() == 0, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load())
require.Eventuallyf(t, func() bool {
return types.OutStandingTimeSeriesBinary.Load() == 0
}, 2*time.Second, 100*time.Millisecond, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load())
}

func handlePost(t *testing.T, _ http.ResponseWriter, r *http.Request) ([]prompb.TimeSeries, []prompb.MetricMetadata) {
Expand Down Expand Up @@ -362,3 +371,54 @@ func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports,
}},
})
}

func newSafeSlice[T any]() *safeSlice[T] {
return &safeSlice[T]{slice: make([]T, 0)}
}

type safeSlice[T any] struct {
slice []T
mut sync.Mutex
}

func (s *safeSlice[T]) Add(v T) {
s.mut.Lock()
defer s.mut.Unlock()
s.slice = append(s.slice, v)
}

func (s *safeSlice[T]) AddSlice(v []T) {
s.mut.Lock()
defer s.mut.Unlock()
s.slice = append(s.slice, v...)
}

func (s *safeSlice[T]) Len() int {
s.mut.Lock()
defer s.mut.Unlock()
return len(s.slice)
}

func (s *safeSlice[T]) Get(i int) T {
s.mut.Lock()
defer s.mut.Unlock()
return s.slice[i]
}

type safeMap struct {
mut sync.Mutex
results map[float64]labels.Labels
}

func (s *safeMap) Add(v float64, ls labels.Labels) {
s.mut.Lock()
defer s.mut.Unlock()
s.results[v] = ls
}

func (s *safeMap) Get(v float64) (labels.Labels, bool) {
s.mut.Lock()
defer s.mut.Unlock()
res, ok := s.results[v]
return res, ok
}

0 comments on commit 2e32ce6

Please sign in to comment.