diff --git a/extension/bindplaneextension/extension.go b/extension/bindplaneextension/extension.go index 003608bda..4f9d0dc51 100644 --- a/extension/bindplaneextension/extension.go +++ b/extension/bindplaneextension/extension.go @@ -66,8 +66,8 @@ func (b *bindplaneExtension) Start(_ context.Context, host component.Host) error return nil } -func (b *bindplaneExtension) RegisterThroughputMeasurements(processorID string, measurements *measurements.ThroughputMeasurements) { - b.ctmr.RegisterThroughputMeasurements(processorID, measurements) +func (b *bindplaneExtension) RegisterThroughputMeasurements(processorID string, measurements *measurements.ThroughputMeasurements) error { + return b.ctmr.RegisterThroughputMeasurements(processorID, measurements) } func (b *bindplaneExtension) setupCustomCapabilities(host component.Host) error { diff --git a/internal/measurements/custom_message.go b/internal/measurements/custom_message.go index 75e44f280..78bb5e257 100644 --- a/internal/measurements/custom_message.go +++ b/internal/measurements/custom_message.go @@ -52,24 +52,25 @@ func OTLPThroughputMeasurements(tm *ThroughputMeasurements, includeCountMetrics ts := pcommon.NewTimestampFromTime(time.Now()) - setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_log_data_size", tm.LogSize(), attrs, ts) - setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_metric_data_size", tm.MetricSize(), attrs, ts) - setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_trace_data_size", tm.TraceSize(), attrs, ts) + addOTLPSum(s, "otelcol_processor_throughputmeasurement_log_data_size", tm.LogSize(), attrs, ts) + addOTLPSum(s, "otelcol_processor_throughputmeasurement_metric_data_size", tm.MetricSize(), attrs, ts) + addOTLPSum(s, "otelcol_processor_throughputmeasurement_trace_data_size", tm.TraceSize(), attrs, ts) if includeCountMetrics { - setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_log_count", tm.LogCount(), attrs, ts) - setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_metric_count", tm.DatapointCount(), attrs, ts) - setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_trace_count", tm.TraceSize(), attrs, ts) + addOTLPSum(s, "otelcol_processor_throughputmeasurement_log_count", tm.LogCount(), attrs, ts) + addOTLPSum(s, "otelcol_processor_throughputmeasurement_metric_count", tm.DatapointCount(), attrs, ts) + addOTLPSum(s, "otelcol_processor_throughputmeasurement_trace_count", tm.TraceSize(), attrs, ts) } return s } -func setOTLPSum(m pmetric.Metric, name string, value int64, attrs pcommon.Map, now pcommon.Timestamp) { +func addOTLPSum(ms pmetric.MetricSlice, name string, value int64, attrs pcommon.Map, now pcommon.Timestamp) { if value == 0 { // Ignore value if it's 0 return } + m := ms.AppendEmpty() m.SetName(name) m.SetEmptySum() diff --git a/internal/measurements/testdata/expected/throughput_measurements_metrics_only.yaml b/internal/measurements/testdata/expected/throughput_measurements_metrics_only.yaml new file mode 100644 index 000000000..b6b04f3b6 --- /dev/null +++ b/internal/measurements/testdata/expected/throughput_measurements_metrics_only.yaml @@ -0,0 +1,14 @@ +resourceMetrics: + - resource: {} + scopeMetrics: + - metrics: + - name: otelcol_processor_throughputmeasurement_metric_data_size + sum: + dataPoints: + - asInt: "5675" + attributes: + - key: processor + value: + stringValue: throughputmeasurement/1 + timeUnixNano: "1000000" + scope: {} diff --git a/internal/measurements/throughput.go b/internal/measurements/throughput.go index 0380921ea..855106132 100644 --- a/internal/measurements/throughput.go +++ b/internal/measurements/throughput.go @@ -32,7 +32,9 @@ import ( // ThroughputMeasurementsRegistry represents a registry for the throughputmeasurement processor to // register their ThroughputMeasurements. type ThroughputMeasurementsRegistry interface { - RegisterThroughputMeasurements(processorID string, measurements *ThroughputMeasurements) + // RegisterThroughputMeasurements registers the measurements for the given processor. + // It should return an error if the processor has already been registered. + RegisterThroughputMeasurements(processorID string, measurements *ThroughputMeasurements) error } // ThroughputMeasurements represents all captured throughput metrics. @@ -228,8 +230,13 @@ func NewResettableThroughputMeasurementsRegistry(emitCountMetrics bool) *Resetta } // RegisterThroughputMeasurements registers the ThroughputMeasurements with the registry. -func (ctmr *ResettableThroughputMeasurementsRegistry) RegisterThroughputMeasurements(processorID string, measurements *ThroughputMeasurements) { - ctmr.measurements.Store(processorID, measurements) +func (ctmr *ResettableThroughputMeasurementsRegistry) RegisterThroughputMeasurements(processorID string, measurements *ThroughputMeasurements) error { + _, alreadyExists := ctmr.measurements.LoadOrStore(processorID, measurements) + if alreadyExists { + return fmt.Errorf("measurements for processor %q was already registered", processorID) + } + + return nil } // Reset unregisters all throughput measurements in this registry diff --git a/internal/measurements/throughput_test.go b/internal/measurements/throughput_test.go index 3012a8eec..5cbb100b4 100644 --- a/internal/measurements/throughput_test.go +++ b/internal/measurements/throughput_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -225,7 +226,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) { tmp.AddMetrics(context.Background(), metrics) tmp.AddTraces(context.Background(), traces) - reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp) + require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)) actualMetrics := reg.OTLPMeasurements(nil) @@ -257,7 +258,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) { tmp.AddMetrics(context.Background(), metrics) tmp.AddTraces(context.Background(), traces) - reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp) + require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)) actualMetrics := reg.OTLPMeasurements(nil) @@ -267,6 +268,30 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) { require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreTimestamp())) }) + t.Run("Test only metrics throughput", func(t *testing.T) { + reg := NewResettableThroughputMeasurementsRegistry(false) + + mp := metric.NewMeterProvider() + defer mp.Shutdown(context.Background()) + + tmp, err := NewThroughputMeasurements(mp, "throughputmeasurement/1", map[string]string{}) + require.NoError(t, err) + + metrics, err := golden.ReadMetrics(filepath.Join("testdata", "metrics", "host-metrics.yaml")) + require.NoError(t, err) + + tmp.AddMetrics(context.Background(), metrics) + + require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)) + + actualMetrics := reg.OTLPMeasurements(nil) + + expectedMetrics, err := golden.ReadMetrics(filepath.Join("testdata", "expected", "throughput_measurements_metrics_only.yaml")) + require.NoError(t, err) + + require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreTimestamp())) + }) + t.Run("Test registered measurements are in OTLP payload (extra attributes)", func(t *testing.T) { reg := NewResettableThroughputMeasurementsRegistry(false) @@ -291,7 +316,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) { tmp.AddMetrics(context.Background(), metrics) tmp.AddTraces(context.Background(), traces) - reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp) + require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)) actualMetrics := reg.OTLPMeasurements(nil) @@ -323,10 +348,25 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) { tmp.AddMetrics(context.Background(), metrics) tmp.AddTraces(context.Background(), traces) - reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp) + require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)) reg.Reset() require.NoError(t, pmetrictest.CompareMetrics(pmetric.NewMetrics(), reg.OTLPMeasurements(nil))) }) + + t.Run("Double registering is an error", func(t *testing.T) { + reg := NewResettableThroughputMeasurementsRegistry(false) + + mp := noop.NewMeterProvider() + + tmp, err := NewThroughputMeasurements(mp, "throughputmeasurement/1", map[string]string{}) + require.NoError(t, err) + + require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)) + + err = reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp) + require.Error(t, err) + require.Equal(t, err.Error(), `measurements for processor "throughputmeasurement/1" was already registered`) + }) } diff --git a/opamp/observiq/measurements.go b/opamp/observiq/measurements.go index 367f3b408..0a26c516e 100644 --- a/opamp/observiq/measurements.go +++ b/opamp/observiq/measurements.go @@ -132,10 +132,9 @@ func (m *measurementsSender) loop() { case <-m.done: return case <-t.Chan(): - m.logger.Info("Ticker fired, sending measurements") if m.reporter == nil { // Continue if no reporter available - m.logger.Info("No reporter, skipping sending measurements.") + m.logger.Debug("No reporter, skipping sending measurements.") continue } diff --git a/opamp/observiq/measurements_test.go b/opamp/observiq/measurements_test.go index 8779fd85f..7ee7b1b56 100644 --- a/opamp/observiq/measurements_test.go +++ b/opamp/observiq/measurements_test.go @@ -60,7 +60,7 @@ func TestMeasurementsSender(t *testing.T) { tm.AddMetrics(context.Background(), m) reg := measurements.NewResettableThroughputMeasurementsRegistry(false) - reg.RegisterThroughputMeasurements(processorID, tm) + require.NoError(t, reg.RegisterThroughputMeasurements(processorID, tm)) ms := newMeasurementsSender(zap.NewNop(), reg, client, 1*time.Millisecond, nil) ms.Start() diff --git a/opamp/observiq/testdata/metrics/expected-throughput.yaml b/opamp/observiq/testdata/metrics/expected-throughput.yaml index a4a260616..b6b04f3b6 100644 --- a/opamp/observiq/testdata/metrics/expected-throughput.yaml +++ b/opamp/observiq/testdata/metrics/expected-throughput.yaml @@ -2,7 +2,6 @@ resourceMetrics: - resource: {} scopeMetrics: - metrics: - - {} - name: otelcol_processor_throughputmeasurement_metric_data_size sum: dataPoints: @@ -12,5 +11,4 @@ resourceMetrics: value: stringValue: throughputmeasurement/1 timeUnixNano: "1000000" - - {} scope: {} diff --git a/processor/throughputmeasurementprocessor/factory.go b/processor/throughputmeasurementprocessor/factory.go index 2456d847b..f1a94b805 100644 --- a/processor/throughputmeasurementprocessor/factory.go +++ b/processor/throughputmeasurementprocessor/factory.go @@ -17,6 +17,7 @@ package throughputmeasurementprocessor import ( "context" "fmt" + "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -60,7 +61,7 @@ func createTracesProcessor( nextConsumer consumer.Traces, ) (processor.Traces, error) { oCfg := cfg.(*Config) - tmp, err := newThroughputMeasurementProcessor(set.Logger, set.TelemetrySettings.MeterProvider, oCfg, set.ID.String()) + tmp, err := createOrGetProcessor(set, oCfg) if err != nil { return nil, fmt.Errorf("create throughputmeasurementprocessor: %w", err) } @@ -69,6 +70,7 @@ func createTracesProcessor( ctx, set, cfg, nextConsumer, tmp.processTraces, processorhelper.WithCapabilities(consumerCapabilities), processorhelper.WithStart(tmp.start), + processorhelper.WithShutdown(tmp.shutdown), ) } @@ -79,7 +81,7 @@ func createLogsProcessor( nextConsumer consumer.Logs, ) (processor.Logs, error) { oCfg := cfg.(*Config) - tmp, err := newThroughputMeasurementProcessor(set.Logger, set.TelemetrySettings.MeterProvider, oCfg, set.ID.String()) + tmp, err := createOrGetProcessor(set, oCfg) if err != nil { return nil, fmt.Errorf("create throughputmeasurementprocessor: %w", err) } @@ -88,6 +90,7 @@ func createLogsProcessor( ctx, set, cfg, nextConsumer, tmp.processLogs, processorhelper.WithCapabilities(consumerCapabilities), processorhelper.WithStart(tmp.start), + processorhelper.WithShutdown(tmp.shutdown), ) } @@ -98,7 +101,7 @@ func createMetricsProcessor( nextConsumer consumer.Metrics, ) (processor.Metrics, error) { oCfg := cfg.(*Config) - tmp, err := newThroughputMeasurementProcessor(set.Logger, set.TelemetrySettings.MeterProvider, oCfg, set.ID.String()) + tmp, err := createOrGetProcessor(set, oCfg) if err != nil { return nil, fmt.Errorf("create throughputmeasurementprocessor: %w", err) } @@ -107,5 +110,38 @@ func createMetricsProcessor( ctx, set, cfg, nextConsumer, tmp.processMetrics, processorhelper.WithCapabilities(consumerCapabilities), processorhelper.WithStart(tmp.start), + processorhelper.WithShutdown(tmp.shutdown), ) } + +func createOrGetProcessor(set processor.Settings, cfg *Config) (*throughputMeasurementProcessor, error) { + processorsMux.Lock() + defer processorsMux.Unlock() + + var tmp *throughputMeasurementProcessor + if p, ok := processors[set.ID]; ok { + tmp = p + } else { + var err error + tmp, err = newThroughputMeasurementProcessor(set.Logger, set.MeterProvider, cfg, set.ID) + if err != nil { + return nil, err + } + + processors[set.ID] = tmp + } + + return tmp, nil +} + +func unregisterProcessor(id component.ID) { + processorsMux.Lock() + defer processorsMux.Unlock() + delete(processors, id) +} + +// processors is a map of component.ID to an instance of throughput processor. +// It is used so that only one instance of a particular throughput processor exists, even if it's included +// across multiple pipelines/signal types. +var processors = map[component.ID]*throughputMeasurementProcessor{} +var processorsMux = sync.Mutex{} diff --git a/processor/throughputmeasurementprocessor/factory_test.go b/processor/throughputmeasurementprocessor/factory_test.go index 9ac74f1f0..b334da790 100644 --- a/processor/throughputmeasurementprocessor/factory_test.go +++ b/processor/throughputmeasurementprocessor/factory_test.go @@ -15,9 +15,14 @@ package throughputmeasurementprocessor import ( + "context" "testing" + "github.com/observiq/bindplane-agent/internal/measurements" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processortest" ) func TestNewFactory(t *testing.T) { @@ -33,3 +38,123 @@ func TestNewFactory(t *testing.T) { require.True(t, ok) require.Equal(t, expectedCfg, cfg) } + +// Test that 2 instances with the same processor ID will not error when started +func TestCreateProcessorTwice_Logs(t *testing.T) { + processorID := component.MustNewIDWithName("throughputmeasurement", "1") + bindplaneExtensionID := component.MustNewID("bindplane") + + set := processortest.NewNopSettings() + set.ID = processorID + + cfg := &Config{ + Enabled: true, + SamplingRatio: 1, + BindplaneExtension: bindplaneExtensionID, + } + + l1, err := createLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + l2, err := createLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + + mockBindplane := mockThoughputRegistry{ + ResettableThroughputMeasurementsRegistry: measurements.NewResettableThroughputMeasurementsRegistry(false), + } + + mh := mockHost{ + extMap: map[component.ID]component.Component{ + bindplaneExtensionID: mockBindplane, + }, + } + + require.NoError(t, l1.Start(context.Background(), mh)) + require.NoError(t, l2.Start(context.Background(), mh)) + require.NoError(t, l1.Shutdown(context.Background())) + require.NoError(t, l2.Shutdown(context.Background())) +} + +// Test that 2 instances with the same processor ID will not error when started +func TestCreateProcessorTwice_Metrics(t *testing.T) { + processorID := component.MustNewIDWithName("throughputmeasurement", "1") + bindplaneExtensionID := component.MustNewID("bindplane") + + set := processortest.NewNopSettings() + set.ID = processorID + + cfg := &Config{ + Enabled: true, + SamplingRatio: 1, + BindplaneExtension: bindplaneExtensionID, + } + + l1, err := createMetricsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + l2, err := createMetricsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + + mockBindplane := mockThoughputRegistry{ + ResettableThroughputMeasurementsRegistry: measurements.NewResettableThroughputMeasurementsRegistry(false), + } + + mh := mockHost{ + extMap: map[component.ID]component.Component{ + bindplaneExtensionID: mockBindplane, + }, + } + + require.NoError(t, l1.Start(context.Background(), mh)) + require.NoError(t, l2.Start(context.Background(), mh)) + require.NoError(t, l1.Shutdown(context.Background())) + require.NoError(t, l2.Shutdown(context.Background())) +} + +// Test that 2 instances with the same processor ID will not error when started +func TestCreateProcessorTwice_Traces(t *testing.T) { + processorID := component.MustNewIDWithName("throughputmeasurement", "1") + bindplaneExtensionID := component.MustNewID("bindplane") + + set := processortest.NewNopSettings() + set.ID = processorID + + cfg := &Config{ + Enabled: true, + SamplingRatio: 1, + BindplaneExtension: bindplaneExtensionID, + } + + l1, err := createTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + l2, err := createTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + + mockBindplane := mockThoughputRegistry{ + ResettableThroughputMeasurementsRegistry: measurements.NewResettableThroughputMeasurementsRegistry(false), + } + + mh := mockHost{ + extMap: map[component.ID]component.Component{ + bindplaneExtensionID: mockBindplane, + }, + } + + require.NoError(t, l1.Start(context.Background(), mh)) + require.NoError(t, l2.Start(context.Background(), mh)) + require.NoError(t, l1.Shutdown(context.Background())) + require.NoError(t, l2.Shutdown(context.Background())) +} + +type mockHost struct { + extMap map[component.ID]component.Component +} + +func (m mockHost) GetExtensions() map[component.ID]component.Component { + return m.extMap +} + +type mockThoughputRegistry struct { + *measurements.ResettableThroughputMeasurementsRegistry +} + +func (mockThoughputRegistry) Start(_ context.Context, _ component.Host) error { return nil } +func (mockThoughputRegistry) Shutdown(_ context.Context) error { return nil } diff --git a/processor/throughputmeasurementprocessor/go.mod b/processor/throughputmeasurementprocessor/go.mod index 2c21ee044..f75469d00 100644 --- a/processor/throughputmeasurementprocessor/go.mod +++ b/processor/throughputmeasurementprocessor/go.mod @@ -9,6 +9,7 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.108.0 go.opentelemetry.io/collector/consumer v0.108.0 + go.opentelemetry.io/collector/consumer/consumertest v0.108.0 go.opentelemetry.io/collector/pdata v1.14.1 go.opentelemetry.io/collector/processor v0.108.0 go.opentelemetry.io/otel v1.28.0 @@ -18,6 +19,7 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -25,14 +27,23 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.108.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.20.1 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect go.opentelemetry.io/collector v0.108.0 // indirect + go.opentelemetry.io/collector/component/componentstatus v0.108.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.108.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.108.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.108.0 // indirect + go.opentelemetry.io/collector/pdata/testdata v0.108.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/processor/throughputmeasurementprocessor/go.sum b/processor/throughputmeasurementprocessor/go.sum index 634db50a0..05adfec17 100644 --- a/processor/throughputmeasurementprocessor/go.sum +++ b/processor/throughputmeasurementprocessor/go.sum @@ -27,6 +27,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/processor/throughputmeasurementprocessor/processor.go b/processor/throughputmeasurementprocessor/processor.go index 4f57a19d2..4e008c537 100644 --- a/processor/throughputmeasurementprocessor/processor.go +++ b/processor/throughputmeasurementprocessor/processor.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math/rand" + "sync" "github.com/observiq/bindplane-agent/internal/measurements" "go.opentelemetry.io/collector/component" @@ -33,12 +34,13 @@ type throughputMeasurementProcessor struct { enabled bool measurements *measurements.ThroughputMeasurements samplingCutOffRatio float64 - processorID string + processorID component.ID bindplane component.ID + startOnce sync.Once } -func newThroughputMeasurementProcessor(logger *zap.Logger, mp metric.MeterProvider, cfg *Config, processorID string) (*throughputMeasurementProcessor, error) { - measurements, err := measurements.NewThroughputMeasurements(mp, processorID, cfg.ExtraLabels) +func newThroughputMeasurementProcessor(logger *zap.Logger, mp metric.MeterProvider, cfg *Config, processorID component.ID) (*throughputMeasurementProcessor, error) { + measurements, err := measurements.NewThroughputMeasurements(mp, processorID.String(), cfg.ExtraLabels) if err != nil { return nil, fmt.Errorf("create throughput measurements: %w", err) } @@ -50,21 +52,29 @@ func newThroughputMeasurementProcessor(logger *zap.Logger, mp metric.MeterProvid samplingCutOffRatio: cfg.SamplingRatio, processorID: processorID, bindplane: cfg.BindplaneExtension, + startOnce: sync.Once{}, }, nil } func (tmp *throughputMeasurementProcessor) start(_ context.Context, host component.Host) error { + var err error + tmp.startOnce.Do(func() { + registry, getRegErr := GetThroughputRegistry(host, tmp.bindplane) + if getRegErr != nil { + err = fmt.Errorf("get throughput registry: %w", getRegErr) + return + } - registry, err := GetThroughputRegistry(host, tmp.bindplane) - if err != nil { - return fmt.Errorf("get throughput registry: %w", err) - } - - if registry != nil { - registry.RegisterThroughputMeasurements(tmp.processorID, tmp.measurements) - } + if registry != nil { + registerErr := registry.RegisterThroughputMeasurements(tmp.processorID.String(), tmp.measurements) + if registerErr != nil { + err = fmt.Errorf("register throughput measurements: %w", registerErr) + return + } + } + }) - return nil + return err } func (tmp *throughputMeasurementProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { @@ -99,3 +109,8 @@ func (tmp *throughputMeasurementProcessor) processMetrics(ctx context.Context, m return md, nil } + +func (tmp *throughputMeasurementProcessor) shutdown(_ context.Context) error { + unregisterProcessor(tmp.processorID) + return nil +} diff --git a/processor/throughputmeasurementprocessor/processor_test.go b/processor/throughputmeasurementprocessor/processor_test.go index 9e30a1f27..ca54f97b9 100644 --- a/processor/throughputmeasurementprocessor/processor_test.go +++ b/processor/throughputmeasurementprocessor/processor_test.go @@ -24,6 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -39,7 +40,7 @@ func TestProcessor_Logs(t *testing.T) { ) defer mp.Shutdown(context.Background()) - processorID := "throughputmeasurement/1" + processorID := component.MustNewIDWithName("throughputmeasurement", "1") tmp, err := newThroughputMeasurementProcessor(zap.NewNop(), mp, &Config{ Enabled: true, @@ -71,7 +72,7 @@ func TestProcessor_Logs(t *testing.T) { processorAttr, ok := sum.DataPoints[0].Attributes.Value(attribute.Key("processor")) require.True(t, ok, "processor attribute was not found") - require.Equal(t, processorID, processorAttr.AsString()) + require.Equal(t, processorID.String(), processorAttr.AsString()) logSize = sum.DataPoints[0].Value @@ -81,7 +82,7 @@ func TestProcessor_Logs(t *testing.T) { processorAttr, ok := sum.DataPoints[0].Attributes.Value(attribute.Key("processor")) require.True(t, ok, "processor attribute was not found") - require.Equal(t, processorID, processorAttr.AsString()) + require.Equal(t, processorID.String(), processorAttr.AsString()) logCount = sum.DataPoints[0].Value } @@ -102,7 +103,7 @@ func TestProcessor_Metrics(t *testing.T) { ) defer mp.Shutdown(context.Background()) - processorID := "throughputmeasurement/1" + processorID := component.MustNewIDWithName("throughputmeasurement", "1") tmp, err := newThroughputMeasurementProcessor(zap.NewNop(), mp, &Config{ Enabled: true, @@ -134,7 +135,7 @@ func TestProcessor_Metrics(t *testing.T) { processorAttr, ok := sum.DataPoints[0].Attributes.Value(attribute.Key("processor")) require.True(t, ok, "processor attribute was not found") - require.Equal(t, processorID, processorAttr.AsString()) + require.Equal(t, processorID.String(), processorAttr.AsString()) metricSize = sum.DataPoints[0].Value @@ -144,7 +145,7 @@ func TestProcessor_Metrics(t *testing.T) { processorAttr, ok := sum.DataPoints[0].Attributes.Value(attribute.Key("processor")) require.True(t, ok, "processor attribute was not found") - require.Equal(t, processorID, processorAttr.AsString()) + require.Equal(t, processorID.String(), processorAttr.AsString()) datapointCount = sum.DataPoints[0].Value } @@ -165,7 +166,7 @@ func TestProcessor_Traces(t *testing.T) { ) defer mp.Shutdown(context.Background()) - processorID := "throughputmeasurement/1" + processorID := component.MustNewIDWithName("throughputmeasurement", "1") tmp, err := newThroughputMeasurementProcessor(zap.NewNop(), mp, &Config{ Enabled: true, @@ -197,7 +198,7 @@ func TestProcessor_Traces(t *testing.T) { processorAttr, ok := sum.DataPoints[0].Attributes.Value(attribute.Key("processor")) require.True(t, ok, "processor attribute was not found") - require.Equal(t, processorID, processorAttr.AsString()) + require.Equal(t, processorID.String(), processorAttr.AsString()) traceSize = sum.DataPoints[0].Value @@ -207,7 +208,7 @@ func TestProcessor_Traces(t *testing.T) { processorAttr, ok := sum.DataPoints[0].Attributes.Value(attribute.Key("processor")) require.True(t, ok, "processor attribute was not found") - require.Equal(t, processorID, processorAttr.AsString()) + require.Equal(t, processorID.String(), processorAttr.AsString()) spanCount = sum.DataPoints[0].Value } @@ -229,7 +230,7 @@ func TestProcessor_Logs_TwoInstancesSameID(t *testing.T) { ) defer mp.Shutdown(context.Background()) - processorID := "throughputmeasurement/1" + processorID := component.MustNewIDWithName("throughputmeasurement", "1") tmp1, err := newThroughputMeasurementProcessor(zap.NewNop(), mp, &Config{ Enabled: true, @@ -267,7 +268,7 @@ func TestProcessor_Logs_TwoInstancesSameID(t *testing.T) { processorAttr, ok := sum.DataPoints[0].Attributes.Value(attribute.Key("processor")) require.True(t, ok, "processor attribute was not found") - require.Equal(t, processorID, processorAttr.AsString()) + require.Equal(t, processorID.String(), processorAttr.AsString()) logSize = sum.DataPoints[0].Value @@ -277,7 +278,7 @@ func TestProcessor_Logs_TwoInstancesSameID(t *testing.T) { processorAttr, ok := sum.DataPoints[0].Attributes.Value(attribute.Key("processor")) require.True(t, ok, "processor attribute was not found") - require.Equal(t, processorID, processorAttr.AsString()) + require.Equal(t, processorID.String(), processorAttr.AsString()) logCount = sum.DataPoints[0].Value } @@ -299,8 +300,8 @@ func TestProcessor_Logs_TwoInstancesDifferentID(t *testing.T) { ) defer mp.Shutdown(context.Background()) - processorID1 := "throughputmeasurement/1" - processorID2 := "throughputmeasurement/2" + processorID1 := component.MustNewIDWithName("throughputmeasurement", "1") + processorID2 := component.MustNewIDWithName("throughputmeasurement", "2") tmp1, err := newThroughputMeasurementProcessor(zap.NewNop(), mp, &Config{ Enabled: true, @@ -344,9 +345,9 @@ func TestProcessor_Logs_TwoInstancesDifferentID(t *testing.T) { require.True(t, ok, "processor attribute was not found") switch processorAttr.AsString() { - case processorID1: + case processorID1.String(): logSize1 = dp.Value - case processorID2: + case processorID2.String(): logSize2 = dp.Value default: require.Fail(t, "ID %s should not be present in log data size metrics", processorAttr.AsString()) @@ -362,9 +363,9 @@ func TestProcessor_Logs_TwoInstancesDifferentID(t *testing.T) { require.True(t, ok, "processor attribute was not found") switch processorAttr.AsString() { - case processorID1: + case processorID1.String(): logCount1 = dp.Value - case processorID2: + case processorID2.String(): logCount2 = dp.Value default: require.Fail(t, "ID %s should not be present in log count metrics", processorAttr.AsString())