Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Throughput processor should be singleton #1842

Merged
merged 10 commits into from
Sep 6, 2024
4 changes: 2 additions & 2 deletions extension/bindplaneextension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (b *bindplaneExtension) Start(_ context.Context, host component.Host) error
return nil
}

func (b *bindplaneExtension) RegisterThroughputMeasurements(processorID string, measurements *measurements.ThroughputMeasurements) {
b.ctmr.RegisterThroughputMeasurements(processorID, measurements)
func (b *bindplaneExtension) RegisterThroughputMeasurements(processorID string, measurements *measurements.ThroughputMeasurements) error {
return b.ctmr.RegisterThroughputMeasurements(processorID, measurements)
}

func (b *bindplaneExtension) setupCustomCapabilities(host component.Host) error {
Expand Down
15 changes: 8 additions & 7 deletions internal/measurements/custom_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,25 @@ func OTLPThroughputMeasurements(tm *ThroughputMeasurements, includeCountMetrics

ts := pcommon.NewTimestampFromTime(time.Now())

setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_log_data_size", tm.LogSize(), attrs, ts)
setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_metric_data_size", tm.MetricSize(), attrs, ts)
setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_trace_data_size", tm.TraceSize(), attrs, ts)
addOTLPSum(s, "otelcol_processor_throughputmeasurement_log_data_size", tm.LogSize(), attrs, ts)
addOTLPSum(s, "otelcol_processor_throughputmeasurement_metric_data_size", tm.MetricSize(), attrs, ts)
addOTLPSum(s, "otelcol_processor_throughputmeasurement_trace_data_size", tm.TraceSize(), attrs, ts)

if includeCountMetrics {
setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_log_count", tm.LogCount(), attrs, ts)
setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_metric_count", tm.DatapointCount(), attrs, ts)
setOTLPSum(s.AppendEmpty(), "otelcol_processor_throughputmeasurement_trace_count", tm.TraceSize(), attrs, ts)
addOTLPSum(s, "otelcol_processor_throughputmeasurement_log_count", tm.LogCount(), attrs, ts)
addOTLPSum(s, "otelcol_processor_throughputmeasurement_metric_count", tm.DatapointCount(), attrs, ts)
addOTLPSum(s, "otelcol_processor_throughputmeasurement_trace_count", tm.TraceSize(), attrs, ts)
}

return s
}

func setOTLPSum(m pmetric.Metric, name string, value int64, attrs pcommon.Map, now pcommon.Timestamp) {
func addOTLPSum(ms pmetric.MetricSlice, name string, value int64, attrs pcommon.Map, now pcommon.Timestamp) {
if value == 0 {
// Ignore value if it's 0
return
}
m := ms.AppendEmpty()

m.SetName(name)
m.SetEmptySum()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
resourceMetrics:
- resource: {}
scopeMetrics:
- metrics:
- name: otelcol_processor_throughputmeasurement_metric_data_size
sum:
dataPoints:
- asInt: "5675"
attributes:
- key: processor
value:
stringValue: throughputmeasurement/1
timeUnixNano: "1000000"
scope: {}
13 changes: 10 additions & 3 deletions internal/measurements/throughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
// ThroughputMeasurementsRegistry represents a registry for the throughputmeasurement processor to
// register their ThroughputMeasurements.
type ThroughputMeasurementsRegistry interface {
RegisterThroughputMeasurements(processorID string, measurements *ThroughputMeasurements)
// RegisterThroughputMeasurements registers the measurements for the given processor.
// It should return an error if the processor has already been registered.
RegisterThroughputMeasurements(processorID string, measurements *ThroughputMeasurements) error
antonblock marked this conversation as resolved.
Show resolved Hide resolved
}

// ThroughputMeasurements represents all captured throughput metrics.
Expand Down Expand Up @@ -228,8 +230,13 @@ func NewResettableThroughputMeasurementsRegistry(emitCountMetrics bool) *Resetta
}

// RegisterThroughputMeasurements registers the ThroughputMeasurements with the registry.
func (ctmr *ResettableThroughputMeasurementsRegistry) RegisterThroughputMeasurements(processorID string, measurements *ThroughputMeasurements) {
ctmr.measurements.Store(processorID, measurements)
func (ctmr *ResettableThroughputMeasurementsRegistry) RegisterThroughputMeasurements(processorID string, measurements *ThroughputMeasurements) error {
_, alreadyExists := ctmr.measurements.LoadOrStore(processorID, measurements)
if alreadyExists {
return fmt.Errorf("measurements for processor %q was already registered", processorID)
}

return nil
}

// Reset unregisters all throughput measurements in this registry
Expand Down
48 changes: 44 additions & 4 deletions internal/measurements/throughput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand Down Expand Up @@ -225,7 +226,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
tmp.AddMetrics(context.Background(), metrics)
tmp.AddTraces(context.Background(), traces)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

Expand Down Expand Up @@ -257,7 +258,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
tmp.AddMetrics(context.Background(), metrics)
tmp.AddTraces(context.Background(), traces)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

Expand All @@ -267,6 +268,30 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreTimestamp()))
})

t.Run("Test only metrics throughput", func(t *testing.T) {
reg := NewResettableThroughputMeasurementsRegistry(false)

mp := metric.NewMeterProvider()
defer mp.Shutdown(context.Background())

tmp, err := NewThroughputMeasurements(mp, "throughputmeasurement/1", map[string]string{})
require.NoError(t, err)

metrics, err := golden.ReadMetrics(filepath.Join("testdata", "metrics", "host-metrics.yaml"))
require.NoError(t, err)

tmp.AddMetrics(context.Background(), metrics)

require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

expectedMetrics, err := golden.ReadMetrics(filepath.Join("testdata", "expected", "throughput_measurements_metrics_only.yaml"))
require.NoError(t, err)

require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreTimestamp()))
})

t.Run("Test registered measurements are in OTLP payload (extra attributes)", func(t *testing.T) {
reg := NewResettableThroughputMeasurementsRegistry(false)

Expand All @@ -291,7 +316,7 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
tmp.AddMetrics(context.Background(), metrics)
tmp.AddTraces(context.Background(), traces)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

actualMetrics := reg.OTLPMeasurements(nil)

Expand Down Expand Up @@ -323,10 +348,25 @@ func TestResettableThroughputMeasurementsRegistry(t *testing.T) {
tmp.AddMetrics(context.Background(), metrics)
tmp.AddTraces(context.Background(), traces)

reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

reg.Reset()

require.NoError(t, pmetrictest.CompareMetrics(pmetric.NewMetrics(), reg.OTLPMeasurements(nil)))
})

t.Run("Double registering is an error", func(t *testing.T) {
reg := NewResettableThroughputMeasurementsRegistry(false)

mp := noop.NewMeterProvider()

tmp, err := NewThroughputMeasurements(mp, "throughputmeasurement/1", map[string]string{})
require.NoError(t, err)

require.NoError(t, reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp))

err = reg.RegisterThroughputMeasurements("throughputmeasurement/1", tmp)
require.Error(t, err)
require.Equal(t, err.Error(), `measurements for processor "throughputmeasurement/1" was already registered`)
})
}
3 changes: 1 addition & 2 deletions opamp/observiq/measurements.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,9 @@ func (m *measurementsSender) loop() {
case <-m.done:
return
case <-t.Chan():
m.logger.Info("Ticker fired, sending measurements")
if m.reporter == nil {
// Continue if no reporter available
m.logger.Info("No reporter, skipping sending measurements.")
m.logger.Debug("No reporter, skipping sending measurements.")
continue
}

Expand Down
2 changes: 1 addition & 1 deletion opamp/observiq/measurements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestMeasurementsSender(t *testing.T) {
tm.AddMetrics(context.Background(), m)

reg := measurements.NewResettableThroughputMeasurementsRegistry(false)
reg.RegisterThroughputMeasurements(processorID, tm)
require.NoError(t, reg.RegisterThroughputMeasurements(processorID, tm))

ms := newMeasurementsSender(zap.NewNop(), reg, client, 1*time.Millisecond, nil)
ms.Start()
Expand Down
2 changes: 0 additions & 2 deletions opamp/observiq/testdata/metrics/expected-throughput.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ resourceMetrics:
- resource: {}
scopeMetrics:
- metrics:
- {}
- name: otelcol_processor_throughputmeasurement_metric_data_size
sum:
dataPoints:
Expand All @@ -12,5 +11,4 @@ resourceMetrics:
value:
stringValue: throughputmeasurement/1
timeUnixNano: "1000000"
- {}
scope: {}
42 changes: 39 additions & 3 deletions processor/throughputmeasurementprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package throughputmeasurementprocessor
import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -60,7 +61,7 @@ func createTracesProcessor(
nextConsumer consumer.Traces,
) (processor.Traces, error) {
oCfg := cfg.(*Config)
tmp, err := newThroughputMeasurementProcessor(set.Logger, set.TelemetrySettings.MeterProvider, oCfg, set.ID.String())
tmp, err := createOrGetProcessor(set, oCfg)
if err != nil {
return nil, fmt.Errorf("create throughputmeasurementprocessor: %w", err)
}
Expand All @@ -69,6 +70,7 @@ func createTracesProcessor(
ctx, set, cfg, nextConsumer, tmp.processTraces,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(tmp.start),
processorhelper.WithShutdown(tmp.shutdown),
)
}

Expand All @@ -79,7 +81,7 @@ func createLogsProcessor(
nextConsumer consumer.Logs,
) (processor.Logs, error) {
oCfg := cfg.(*Config)
tmp, err := newThroughputMeasurementProcessor(set.Logger, set.TelemetrySettings.MeterProvider, oCfg, set.ID.String())
tmp, err := createOrGetProcessor(set, oCfg)
if err != nil {
return nil, fmt.Errorf("create throughputmeasurementprocessor: %w", err)
}
Expand All @@ -88,6 +90,7 @@ func createLogsProcessor(
ctx, set, cfg, nextConsumer, tmp.processLogs,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(tmp.start),
processorhelper.WithShutdown(tmp.shutdown),
)
}

Expand All @@ -98,7 +101,7 @@ func createMetricsProcessor(
nextConsumer consumer.Metrics,
) (processor.Metrics, error) {
oCfg := cfg.(*Config)
tmp, err := newThroughputMeasurementProcessor(set.Logger, set.TelemetrySettings.MeterProvider, oCfg, set.ID.String())
tmp, err := createOrGetProcessor(set, oCfg)
if err != nil {
return nil, fmt.Errorf("create throughputmeasurementprocessor: %w", err)
}
Expand All @@ -107,5 +110,38 @@ func createMetricsProcessor(
ctx, set, cfg, nextConsumer, tmp.processMetrics,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(tmp.start),
processorhelper.WithShutdown(tmp.shutdown),
)
}

func createOrGetProcessor(set processor.Settings, cfg *Config) (*throughputMeasurementProcessor, error) {
processorsMux.Lock()
defer processorsMux.Unlock()

var tmp *throughputMeasurementProcessor
if p, ok := processors[set.ID]; ok {
tmp = p
} else {
var err error
tmp, err = newThroughputMeasurementProcessor(set.Logger, set.MeterProvider, cfg, set.ID)
if err != nil {
return nil, err
}

processors[set.ID] = tmp
}

return tmp, nil
}

func unregisterProcessor(id component.ID) {
processorsMux.Lock()
defer processorsMux.Unlock()
delete(processors, id)
}

// processors is a map of component.ID to an instance of throughput processor.
// It is used so that only one instance of a particular throughput processor exists, even if it's included
// across multiple pipelines/signal types.
var processors = map[component.ID]*throughputMeasurementProcessor{}
var processorsMux = sync.Mutex{}
Loading
Loading