Skip to content

Commit

Permalink
only run registry logic once
Browse files Browse the repository at this point in the history
  • Loading branch information
BinaryFissionGames committed Sep 5, 2024
1 parent 90691a6 commit a77062f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
12 changes: 6 additions & 6 deletions internal/measurements/throughput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
tmp.AddMetrics(context.Background(), metrics)
tmp.AddTraces(context.Background(), traces)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

Expand Down Expand Up @@ -257,7 +257,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
tmp.AddMetrics(context.Background(), metrics)
tmp.AddTraces(context.Background(), traces)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

Expand All @@ -281,7 +281,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {

tmp.AddMetrics(context.Background(), metrics)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

Expand All @@ -306,7 +306,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {

tmp.AddMetrics(context.Background(), metrics)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

Expand Down Expand Up @@ -341,7 +341,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
tmp.AddMetrics(context.Background(), metrics)
tmp.AddTraces(context.Background(), traces)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

Expand Down Expand Up @@ -373,7 +373,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
tmp.AddMetrics(context.Background(), metrics)
tmp.AddTraces(context.Background(), traces)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

reg.Reset()

Expand Down
2 changes: 1 addition & 1 deletion opamp/observiq/measurements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestMeasurementsSender(t *testing.T) {
tm.AddMetrics(context.Background(), m)

reg := measurements.NewResettableThroughputMeasurementsRegistry(false)
reg.RegisterThroughputMeasurements(processorID, tm)
require.NoError(t, reg.RegisterThroughputMeasurements(processorID, tm))

ms := newMeasurementsSender(zap.NewNop(), reg, client, 1*time.Millisecond, nil)
ms.Start()
Expand Down
29 changes: 18 additions & 11 deletions processor/throughputmeasurementprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"

"github.com/observiq/bindplane-agent/internal/measurements"
"go.opentelemetry.io/collector/component"
Expand All @@ -35,6 +36,7 @@ type throughputMeasurementProcessor struct {
samplingCutOffRatio float64
processorID component.ID
bindplane component.ID
startOnce sync.Once
}

func newThroughputMeasurementProcessor(logger *zap.Logger, mp metric.MeterProvider, cfg *Config, processorID component.ID) (*throughputMeasurementProcessor, error) {
Expand All @@ -50,24 +52,29 @@ func newThroughputMeasurementProcessor(logger *zap.Logger, mp metric.MeterProvid
samplingCutOffRatio: cfg.SamplingRatio,
processorID: processorID,
bindplane: cfg.BindplaneExtension,
startOnce: sync.Once{},
}, nil
}

func (tmp *throughputMeasurementProcessor) start(_ context.Context, host component.Host) error {
var err error
tmp.startOnce.Do(func() {
registry, getRegErr := GetThroughputRegistry(host, tmp.bindplane)
if getRegErr != nil {
err = fmt.Errorf("get throughput registry: %w", err)
return
}

registry, err := GetThroughputRegistry(host, tmp.bindplane)
if err != nil {
return fmt.Errorf("get throughput registry: %w", err)
}

if registry != nil {
err := registry.RegisterThroughputMeasurements(tmp.processorID.String(), tmp.measurements)
if err != nil {
return fmt.Errorf("register throughput measurements: %w", err)
if registry != nil {
registerErr := registry.RegisterThroughputMeasurements(tmp.processorID.String(), tmp.measurements)
if registerErr != nil {
err = fmt.Errorf("register throughput measurements: %w", err)
return
}
}
}
})

return nil
return err
}

func (tmp *throughputMeasurementProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
Expand Down

0 comments on commit a77062f

Please sign in to comment.