diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 23f73b06a33..83a02ad5f37 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -86,6 +86,10 @@ func GetAvailableLocalIPv6Address(t testing.TB) string { return endpoint } +func ptr[T any](v T) *T { + return &v +} + func GetAvailableLocalAddressPrometheus(t testing.TB) *config.Prometheus { address := GetAvailableLocalAddress(t) host, port, err := net.SplitHostPort(address) @@ -97,8 +101,14 @@ func GetAvailableLocalAddressPrometheus(t testing.TB) *config.Prometheus { return nil } return &config.Prometheus{ - Host: &host, - Port: &portInt, + Host: &host, + Port: &portInt, + WithoutScopeInfo: ptr(true), + WithoutTypeSuffix: ptr(true), + WithoutUnits: ptr(true), + WithResourceConstantLabels: &config.IncludeExclude{ + Included: []string{"*"}, + }, } } diff --git a/service/go.mod b/service/go.mod index 09fd63fa326..6bcf201c326 100644 --- a/service/go.mod +++ b/service/go.mod @@ -29,10 +29,7 @@ require ( go.opentelemetry.io/contrib/config v0.8.0 go.opentelemetry.io/contrib/propagators/b3 v1.28.0 go.opentelemetry.io/otel v1.28.0 - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 go.opentelemetry.io/otel/exporters/prometheus v0.50.0 - go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/sdk/metric v1.28.0 @@ -91,9 +88,12 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/contrib/zpages v0.53.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.4.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect go.opentelemetry.io/otel/log v0.4.0 // indirect go.opentelemetry.io/otel/sdk/log v0.4.0 // indirect diff --git a/service/internal/proctelemetry/config.go b/service/internal/proctelemetry/config.go deleted file mode 100644 index 1bb86bb65b2..00000000000 --- a/service/internal/proctelemetry/config.go +++ /dev/null @@ -1,347 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry" - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net" - "net/http" - "net/url" - "os" - "strings" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/contrib/config" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - otelprom "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" - "go.opentelemetry.io/otel/sdk/instrumentation" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/resource" - - "go.opentelemetry.io/collector/processor/processorhelper" - semconv "go.opentelemetry.io/collector/semconv/v1.18.0" -) - -const ( - - // gRPC Instrumentation Name - GRPCInstrumentation = "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - - // http Instrumentation Name - HTTPInstrumentation = "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - - // supported protocols - protocolProtobufHTTP = "http/protobuf" - protocolProtobufGRPC = "grpc/protobuf" - defaultReadHeaderTimeout = 10 * time.Second -) - -var ( - // GRPCUnacceptableKeyValues is a list of high cardinality grpc attributes that should be filtered out. - GRPCUnacceptableKeyValues = []attribute.KeyValue{ - attribute.String(semconv.AttributeNetSockPeerAddr, ""), - attribute.String(semconv.AttributeNetSockPeerPort, ""), - attribute.String(semconv.AttributeNetSockPeerName, ""), - } - - // HTTPUnacceptableKeyValues is a list of high cardinality http attributes that should be filtered out. - HTTPUnacceptableKeyValues = []attribute.KeyValue{ - attribute.String(semconv.AttributeNetHostName, ""), - attribute.String(semconv.AttributeNetHostPort, ""), - } - - errNoValidMetricExporter = errors.New("no valid metric exporter") -) - -func InitMetricReader(ctx context.Context, reader config.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { - if reader.Pull != nil { - return initPullExporter(reader.Pull.Exporter, asyncErrorChannel) - } - if reader.Periodic != nil { - var opts []sdkmetric.PeriodicReaderOption - if reader.Periodic.Interval != nil { - opts = append(opts, sdkmetric.WithInterval(time.Duration(*reader.Periodic.Interval)*time.Millisecond)) - } - - if reader.Periodic.Timeout != nil { - opts = append(opts, sdkmetric.WithTimeout(time.Duration(*reader.Periodic.Timeout)*time.Millisecond)) - } - return initPeriodicExporter(ctx, reader.Periodic.Exporter, opts...) - } - return nil, nil, fmt.Errorf("unsupported metric reader type %v", reader) -} - -func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disableHighCardinality bool) (*sdkmetric.MeterProvider, error) { - opts := []sdkmetric.Option{ - sdkmetric.WithResource(res), - sdkmetric.WithView(batchViews(disableHighCardinality)...), - } - - opts = append(opts, options...) - return sdkmetric.NewMeterProvider( - opts..., - ), nil -} - -func InitPrometheusServer(registry *prometheus.Registry, address string, asyncErrorChannel chan error) *http.Server { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) - server := &http.Server{ - Addr: address, - Handler: mux, - ReadHeaderTimeout: defaultReadHeaderTimeout, - } - go func() { - if serveErr := server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) { - asyncErrorChannel <- serveErr - } - }() - return server -} - -func batchViews(disableHighCardinality bool) []sdkmetric.View { - views := []sdkmetric.View{ - sdkmetric.NewView( - sdkmetric.Instrument{Name: processorhelper.BuildCustomMetricName("batch", "batch_send_size")}, - sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, - }}, - ), - sdkmetric.NewView( - sdkmetric.Instrument{Name: processorhelper.BuildCustomMetricName("batch", "batch_send_size_bytes")}, - sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, - 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, - 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, - }}, - ), - } - if disableHighCardinality { - views = append(views, sdkmetric.NewView(sdkmetric.Instrument{ - Scope: instrumentation.Scope{ - Name: GRPCInstrumentation, - }, - }, sdkmetric.Stream{ - AttributeFilter: cardinalityFilter(GRPCUnacceptableKeyValues...), - })) - views = append(views, sdkmetric.NewView(sdkmetric.Instrument{ - Scope: instrumentation.Scope{ - Name: HTTPInstrumentation, - }, - }, sdkmetric.Stream{ - AttributeFilter: cardinalityFilter(HTTPUnacceptableKeyValues...), - })) - } - return views -} - -func cardinalityFilter(kvs ...attribute.KeyValue) attribute.Filter { - filter := attribute.NewSet(kvs...) - return func(kv attribute.KeyValue) bool { - return !filter.HasValue(kv.Key) - } -} - -func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { - promRegistry := prometheus.NewRegistry() - if prometheusConfig.Host == nil { - return nil, nil, fmt.Errorf("host must be specified") - } - if prometheusConfig.Port == nil { - return nil, nil, fmt.Errorf("port must be specified") - } - - opts := []otelprom.Option{ - otelprom.WithRegisterer(promRegistry), - // https://github.com/open-telemetry/opentelemetry-collector/issues/8043 - otelprom.WithoutUnits(), - // Disabled for the moment until this becomes stable, and we are ready to break backwards compatibility. - otelprom.WithoutScopeInfo(), - // This allows us to produce metrics that are backwards compatible w/ opencensus - otelprom.WithoutCounterSuffixes(), - otelprom.WithResourceAsConstantLabels(attribute.NewDenyKeysFilter()), - } - exporter, err := otelprom.New(opts...) - if err != nil { - return nil, nil, fmt.Errorf("error creating otel prometheus exporter: %w", err) - } - - return exporter, InitPrometheusServer(promRegistry, net.JoinHostPort(*prometheusConfig.Host, fmt.Sprintf("%d", *prometheusConfig.Port)), asyncErrorChannel), nil -} - -func initPullExporter(exporter config.MetricExporter, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { - if exporter.Prometheus != nil { - return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel) - } - return nil, nil, errNoValidMetricExporter -} - -func initPeriodicExporter(ctx context.Context, exporter config.MetricExporter, opts ...sdkmetric.PeriodicReaderOption) (sdkmetric.Reader, *http.Server, error) { - if exporter.Console != nil { - enc := json.NewEncoder(os.Stdout) - enc.SetIndent("", " ") - - exp, err := stdoutmetric.New( - stdoutmetric.WithEncoder(enc), - ) - if err != nil { - return nil, nil, err - } - return sdkmetric.NewPeriodicReader(exp, opts...), nil, nil - } - if exporter.OTLP != nil { - var err error - var exp sdkmetric.Exporter - switch exporter.OTLP.Protocol { - case protocolProtobufHTTP: - exp, err = initOTLPHTTPExporter(ctx, exporter.OTLP) - case protocolProtobufGRPC: - exp, err = initOTLPgRPCExporter(ctx, exporter.OTLP) - default: - return nil, nil, fmt.Errorf("unsupported protocol %s", exporter.OTLP.Protocol) - } - if err != nil { - return nil, nil, err - } - return sdkmetric.NewPeriodicReader(exp, opts...), nil, nil - } - return nil, nil, errNoValidMetricExporter -} - -func normalizeEndpoint(endpoint string) string { - if !strings.HasPrefix(endpoint, "https://") && !strings.HasPrefix(endpoint, "http://") { - return fmt.Sprintf("http://%s", endpoint) - } - return endpoint -} - -func initOTLPgRPCExporter(ctx context.Context, otlpConfig *config.OTLPMetric) (sdkmetric.Exporter, error) { - opts := []otlpmetricgrpc.Option{} - - if len(otlpConfig.Endpoint) > 0 { - u, err := url.ParseRequestURI(normalizeEndpoint(otlpConfig.Endpoint)) - if err != nil { - return nil, err - } - opts = append(opts, otlpmetricgrpc.WithEndpoint(u.Host)) - if u.Scheme == "http" { - opts = append(opts, otlpmetricgrpc.WithInsecure()) - } - } - - if otlpConfig.Compression != nil { - switch *otlpConfig.Compression { - case "gzip": - opts = append(opts, otlpmetricgrpc.WithCompressor(*otlpConfig.Compression)) - case "none": - break - default: - return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) - } - } - if otlpConfig.Timeout != nil { - opts = append(opts, otlpmetricgrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) - } - if len(otlpConfig.Headers) > 0 { - opts = append(opts, otlpmetricgrpc.WithHeaders(otlpConfig.Headers)) - } - if otlpConfig.TemporalityPreference != nil { - switch *otlpConfig.TemporalityPreference { - case "delta": - opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(temporalityPreferenceDelta)) - case "cumulative": - opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(temporalityPreferenceCumulative)) - case "lowmemory": - opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(temporalityPreferenceLowMemory)) - default: - return nil, fmt.Errorf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference) - } - } - - return otlpmetricgrpc.New(ctx, opts...) -} - -func initOTLPHTTPExporter(ctx context.Context, otlpConfig *config.OTLPMetric) (sdkmetric.Exporter, error) { - opts := []otlpmetrichttp.Option{} - - if len(otlpConfig.Endpoint) > 0 { - u, err := url.ParseRequestURI(normalizeEndpoint(otlpConfig.Endpoint)) - if err != nil { - return nil, err - } - opts = append(opts, otlpmetrichttp.WithEndpoint(u.Host)) - - if u.Scheme == "http" { - opts = append(opts, otlpmetrichttp.WithInsecure()) - } - if len(u.Path) > 0 { - opts = append(opts, otlpmetrichttp.WithURLPath(u.Path)) - } - } - if otlpConfig.Compression != nil { - switch *otlpConfig.Compression { - case "gzip": - opts = append(opts, otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression)) - case "none": - opts = append(opts, otlpmetrichttp.WithCompression(otlpmetrichttp.NoCompression)) - default: - return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) - } - } - if otlpConfig.Timeout != nil { - opts = append(opts, otlpmetrichttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) - } - if len(otlpConfig.Headers) > 0 { - opts = append(opts, otlpmetrichttp.WithHeaders(otlpConfig.Headers)) - } - if otlpConfig.TemporalityPreference != nil { - switch *otlpConfig.TemporalityPreference { - case "delta": - opts = append(opts, otlpmetrichttp.WithTemporalitySelector(temporalityPreferenceDelta)) - case "cumulative": - opts = append(opts, otlpmetrichttp.WithTemporalitySelector(temporalityPreferenceCumulative)) - case "lowmemory": - opts = append(opts, otlpmetrichttp.WithTemporalitySelector(temporalityPreferenceLowMemory)) - default: - return nil, fmt.Errorf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference) - } - } - - return otlpmetrichttp.New(ctx, opts...) -} - -func temporalityPreferenceCumulative(_ sdkmetric.InstrumentKind) metricdata.Temporality { - return metricdata.CumulativeTemporality -} - -func temporalityPreferenceDelta(ik sdkmetric.InstrumentKind) metricdata.Temporality { - switch ik { - case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindObservableCounter, sdkmetric.InstrumentKindHistogram: - return metricdata.DeltaTemporality - case sdkmetric.InstrumentKindObservableUpDownCounter, sdkmetric.InstrumentKindUpDownCounter: - return metricdata.CumulativeTemporality - default: - return metricdata.DeltaTemporality - } -} - -func temporalityPreferenceLowMemory(ik sdkmetric.InstrumentKind) metricdata.Temporality { - switch ik { - case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindHistogram: - return metricdata.DeltaTemporality - case sdkmetric.InstrumentKindObservableCounter, sdkmetric.InstrumentKindObservableUpDownCounter, sdkmetric.InstrumentKindUpDownCounter: - return metricdata.CumulativeTemporality - default: - return metricdata.DeltaTemporality - } -} diff --git a/service/internal/proctelemetry/config_test.go b/service/internal/proctelemetry/config_test.go deleted file mode 100644 index d0560ac9c8c..00000000000 --- a/service/internal/proctelemetry/config_test.go +++ /dev/null @@ -1,558 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package proctelemetry - -import ( - "context" - "errors" - "net/url" - "reflect" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/contrib/config" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - otelprom "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" -) - -func strPtr(s string) *string { - return &s -} - -func intPtr(i int) *int { - return &i -} - -func TestMetricReader(t *testing.T) { - consoleExporter, err := stdoutmetric.New( - stdoutmetric.WithPrettyPrint(), - ) - require.NoError(t, err) - ctx := context.Background() - otlpGRPCExporter, err := otlpmetricgrpc.New(ctx) - require.NoError(t, err) - otlpHTTPExporter, err := otlpmetrichttp.New(ctx) - require.NoError(t, err) - promExporter, err := otelprom.New() - require.NoError(t, err) - - testCases := []struct { - name string - reader config.MetricReader - args any - wantErr error - wantReader sdkmetric.Reader - }{ - { - name: "noreader", - wantErr: errors.New("unsupported metric reader type { }"), - }, - { - name: "pull prometheus invalid exporter", - reader: config.MetricReader{ - Pull: &config.PullMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{}, - }, - }, - }, - wantErr: errNoValidMetricExporter, - }, - { - name: "pull/prometheus-invalid-config-no-host", - reader: config.MetricReader{ - Pull: &config.PullMetricReader{ - Exporter: config.MetricExporter{ - Prometheus: &config.Prometheus{}, - }, - }, - }, - wantErr: errors.New("host must be specified"), - }, - { - name: "pull/prometheus-invalid-config-no-port", - reader: config.MetricReader{ - Pull: &config.PullMetricReader{ - Exporter: config.MetricExporter{ - Prometheus: &config.Prometheus{ - Host: strPtr("locahost"), - }, - }, - }, - }, - wantErr: errors.New("port must be specified"), - }, - { - name: "pull/prometheus-valid", - reader: config.MetricReader{ - Pull: &config.PullMetricReader{ - Exporter: config.MetricExporter{ - Prometheus: &config.Prometheus{ - Host: strPtr("locahost"), - Port: intPtr(8080), - }, - }, - }, - }, - wantReader: promExporter, - }, - { - name: "periodic/invalid-exporter", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - Prometheus: &config.Prometheus{ - Host: strPtr("locahost"), - Port: intPtr(8080), - }, - }, - }, - }, - wantErr: errNoValidMetricExporter, - }, - { - name: "periodic/no-exporter", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{}, - }, - wantErr: errNoValidMetricExporter, - }, - { - name: "periodic/console-exporter", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - Console: config.Console{}, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(consoleExporter), - }, - { - name: "periodic/console-exporter-with-timeout-interval", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Interval: intPtr(10), - Timeout: intPtr(5), - Exporter: config.MetricExporter{ - Console: config.Console{}, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(consoleExporter), - }, - { - name: "periodic/otlp-exporter-invalid-protocol", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: *strPtr("http/invalid"), - }, - }, - }, - }, - wantErr: errors.New("unsupported protocol http/invalid"), - }, - { - name: "periodic/otlp-grpc-exporter-no-endpoint", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Compression: strPtr("gzip"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), - }, - { - name: "periodic/otlp-grpc-exporter", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Endpoint: "http://localhost:4317", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), - }, - { - name: "periodic/otlp-grpc-exporter-no-scheme", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Endpoint: "localhost:4317", - Compression: strPtr("gzip"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), - }, - { - name: "periodic/otlp-grpc-invalid-endpoint", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Endpoint: " ", - Compression: strPtr("gzip"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantErr: &url.Error{Op: "parse", URL: "http:// ", Err: url.InvalidHostError(" ")}, - }, - { - name: "periodic/otlp-grpc-invalid-compression", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Endpoint: "localhost:4317", - Compression: strPtr("invalid"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantErr: errors.New("unsupported compression \"invalid\""), - }, - { - name: "periodic/otlp-grpc-delta-temporality", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - TemporalityPreference: strPtr("delta"), - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), - }, - { - name: "periodic/otlp-grpc-cumulative-temporality", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - TemporalityPreference: strPtr("cumulative"), - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), - }, - { - name: "periodic/otlp-grpc-lowmemory-temporality", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - TemporalityPreference: strPtr("lowmemory"), - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), - }, - { - name: "periodic/otlp-grpc-invalid-temporality", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "grpc/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - TemporalityPreference: strPtr("invalid"), - }, - }, - }, - }, - wantErr: errors.New("unsupported temporality preference \"invalid\""), - }, - { - name: "periodic/otlp-http-exporter", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: "http://localhost:4318", - Compression: strPtr("gzip"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), - }, - { - name: "periodic/otlp-http-exporter-with-path", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: "http://localhost:4318/path/123", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), - }, - { - name: "periodic/otlp-http-exporter-no-endpoint", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Compression: strPtr("gzip"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), - }, - { - name: "periodic/otlp-http-exporter-no-scheme", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("gzip"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), - }, - { - name: "periodic/otlp-http-invalid-endpoint", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: " ", - Compression: strPtr("gzip"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantErr: &url.Error{Op: "parse", URL: "http:// ", Err: url.InvalidHostError(" ")}, - }, - { - name: "periodic/otlp-http-invalid-compression", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("invalid"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - }, - }, - }, - }, - wantErr: errors.New("unsupported compression \"invalid\""), - }, - { - name: "periodic/otlp-http-cumulative-temporality", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - TemporalityPreference: strPtr("cumulative"), - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), - }, - { - name: "periodic/otlp-http-lowmemory-temporality", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - TemporalityPreference: strPtr("lowmemory"), - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), - }, - { - name: "periodic/otlp-http-delta-temporality", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - TemporalityPreference: strPtr("delta"), - }, - }, - }, - }, - wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), - }, - { - name: "periodic/otlp-http-invalid-temporality", - reader: config.MetricReader{ - Periodic: &config.PeriodicMetricReader{ - Exporter: config.MetricExporter{ - OTLP: &config.OTLPMetric{ - Protocol: "http/protobuf", - Endpoint: "localhost:4318", - Compression: strPtr("none"), - Timeout: intPtr(1000), - Headers: map[string]string{ - "test": "test1", - }, - TemporalityPreference: strPtr("invalid"), - }, - }, - }, - }, - wantErr: errors.New("unsupported temporality preference \"invalid\""), - }, - } - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - gotReader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error)) - - defer func() { - if gotReader != nil { - assert.NoError(t, gotReader.Shutdown(context.Background())) - } - if server != nil { - assert.NoError(t, server.Shutdown(context.Background())) - } - }() - - assert.Equal(t, tt.wantErr, err) - - if tt.wantReader == nil { - assert.Nil(t, gotReader) - } else { - assert.Equal(t, reflect.TypeOf(tt.wantReader), reflect.TypeOf(gotReader)) - - if reflect.TypeOf(tt.wantReader).String() == "*metric.PeriodicReader" { - wantExporterType := reflect.Indirect(reflect.ValueOf(tt.wantReader)).FieldByName("exporter").Elem().Type() - gotExporterType := reflect.Indirect(reflect.ValueOf(gotReader)).FieldByName("exporter").Elem().Type() - assert.Equal(t, wantExporterType, gotExporterType) - } - } - }) - } -} diff --git a/service/service.go b/service/service.go index 88efd1adb91..2a9bc427ac3 100644 --- a/service/service.go +++ b/service/service.go @@ -77,7 +77,7 @@ type Service struct { // New creates a new Service, its telemetry, and Components. func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { - disableHighCard := obsreportconfig.DisableHighCardinalityMetricsfeatureGate.IsEnabled() + // disableHighCard := obsreportconfig.DisableHighCardinalityMetricsfeatureGate.IsEnabled() extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() srv := &Service{ buildInfo: set.BuildInfo, @@ -115,14 +115,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { logger.Info("Setting up own telemetry...") - mp, err := newMeterProvider( - meterProviderSettings{ - res: res, - cfg: cfg.Telemetry.Metrics, - asyncErrorChannel: set.AsyncErrorChannel, - }, - disableHighCard, - ) + mp, err := telFactory.CreateMeterProvider(ctx, telset, &cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to create metric provider: %w", err) } diff --git a/service/telemetry.go b/service/telemetry.go index a44aaa4e4e4..7ecaa848532 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -3,112 +3,7 @@ package service // import "go.opentelemetry.io/collector/service" -import ( - "context" - "net" - "net/http" - "strconv" - - "go.opentelemetry.io/contrib/config" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/service/internal/proctelemetry" - "go.opentelemetry.io/collector/service/telemetry" -) - const ( zapKeyTelemetryAddress = "address" zapKeyTelemetryLevel = "metrics level" ) - -type meterProvider struct { - *sdkmetric.MeterProvider - servers []*http.Server -} - -type meterProviderSettings struct { - res *resource.Resource - cfg telemetry.MetricsConfig - asyncErrorChannel chan error -} - -func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (metric.MeterProvider, error) { - if set.cfg.Level == configtelemetry.LevelNone || (set.cfg.Address == "" && len(set.cfg.Readers) == 0) { - return noop.NewMeterProvider(), nil - } - - if len(set.cfg.Address) != 0 { - host, port, err := net.SplitHostPort(set.cfg.Address) - if err != nil { - return nil, err - } - portInt, err := strconv.Atoi(port) - if err != nil { - return nil, err - } - if set.cfg.Readers == nil { - set.cfg.Readers = []config.MetricReader{} - } - set.cfg.Readers = append(set.cfg.Readers, config.MetricReader{ - Pull: &config.PullMetricReader{ - Exporter: config.MetricExporter{ - Prometheus: &config.Prometheus{ - Host: &host, - Port: &portInt, - }, - }, - }, - }) - } - - mp := &meterProvider{} - var opts []sdkmetric.Option - for _, reader := range set.cfg.Readers { - // https://github.com/open-telemetry/opentelemetry-collector/issues/8045 - r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel) - if err != nil { - return nil, err - } - if server != nil { - mp.servers = append(mp.servers, server) - - } - opts = append(opts, sdkmetric.WithReader(r)) - } - - var err error - mp.MeterProvider, err = proctelemetry.InitOpenTelemetry(set.res, opts, disableHighCardinality) - if err != nil { - return nil, err - } - return mp, nil -} - -// LogAboutServers logs about the servers that are serving metrics. -func (mp *meterProvider) LogAboutServers(logger *zap.Logger, cfg telemetry.MetricsConfig) { - for _, server := range mp.servers { - logger.Info( - "Serving metrics", - zap.String(zapKeyTelemetryAddress, server.Addr), - zap.Stringer(zapKeyTelemetryLevel, cfg.Level), - ) - } -} - -// Shutdown the meter provider and all the associated resources. -// The type signature of this method matches that of the sdkmetric.MeterProvider. -func (mp *meterProvider) Shutdown(ctx context.Context) error { - var errs error - for _, server := range mp.servers { - if server != nil { - errs = multierr.Append(errs, server.Close()) - } - } - return multierr.Append(errs, mp.MeterProvider.Shutdown(ctx)) -} diff --git a/service/telemetry/factory.go b/service/telemetry/factory.go index edcca9e373f..565aea8a908 100644 --- a/service/telemetry/factory.go +++ b/service/telemetry/factory.go @@ -7,6 +7,7 @@ import ( "context" "time" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -55,5 +56,9 @@ func NewFactory() Factory { c := *cfg.(*Config) return newTracerProvider(ctx, set, c) }), + internal.WithMeterProvider(func(ctx context.Context, _ Settings, cfg component.Config) (metric.MeterProvider, error) { + c := *cfg.(*Config) + return newMeterProvider(ctx, c) + }), ) } diff --git a/service/telemetry/meter.go b/service/telemetry/meter.go new file mode 100644 index 00000000000..21b4a2f297f --- /dev/null +++ b/service/telemetry/meter.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "go.opentelemetry.io/collector/telemetry" + +import ( + "context" + "net" + "strconv" + + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/contrib/config" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" +) + +func newMeterProvider(ctx context.Context, cfg Config) (metric.MeterProvider, error) { + if cfg.Metrics.Level == configtelemetry.LevelNone || (cfg.Metrics.Address == "" && len(cfg.Metrics.Readers) == 0) { + return noop.NewMeterProvider(), nil + } + if len(cfg.Metrics.Address) != 0 { + host, port, err := net.SplitHostPort(cfg.Metrics.Address) + if err != nil { + return nil, err + } + portInt, err := strconv.Atoi(port) + if err != nil { + return nil, err + } + if cfg.Metrics.Readers == nil { + cfg.Metrics.Readers = []config.MetricReader{} + } + cfg.Metrics.Readers = append(cfg.Metrics.Readers, config.MetricReader{ + Pull: &config.PullMetricReader{ + Exporter: config.MetricExporter{ + Prometheus: &config.Prometheus{ + Host: &host, + Port: &portInt, + WithoutScopeInfo: ptr(true), + WithoutTypeSuffix: ptr(true), + WithoutUnits: ptr(true), + WithResourceConstantLabels: &config.IncludeExclude{ + Included: []string{"*"}, + }, + }, + }, + }, + }) + } + sdk, err := config.NewSDK( + config.WithContext(ctx), + config.WithOpenTelemetryConfiguration( + config.OpenTelemetryConfiguration{ + MeterProvider: &config.MeterProvider{ + Readers: cfg.Metrics.Readers, + }, + }, + ), + ) + + if err != nil { + return nil, err + } + + return sdk.MeterProvider(), nil +} + +func ptr[T any](val T) *T { + return &val +} diff --git a/service/telemetry_test.go b/service/telemetry_test.go index a8a382c62ec..26bf95ccc3d 100644 --- a/service/telemetry_test.go +++ b/service/telemetry_test.go @@ -5,8 +5,8 @@ package service import ( "context" + "fmt" "net/http" - "net/http/httptest" "testing" io_prometheus_client "github.com/prometheus/client_model/go" @@ -15,12 +15,10 @@ import ( "go.opentelemetry.io/contrib/config" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/internal/testutil" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/internal/proctelemetry" - "go.opentelemetry.io/collector/service/internal/resource" "go.opentelemetry.io/collector/service/telemetry" ) @@ -77,6 +75,9 @@ func TestTelemetryInit(t *testing.T) { "service_instance_id": testInstanceID, }, }, + "promhttp_metric_handler_errors_total": { + value: 0, + }, "target_info": { value: 0, labels: map[string]string{ @@ -115,6 +116,9 @@ func TestTelemetryInit(t *testing.T) { "service_instance_id": testInstanceID, }, }, + "promhttp_metric_handler_errors_total": { + value: 0, + }, "target_info": { value: 0, labels: map[string]string{ @@ -177,6 +181,9 @@ func TestTelemetryInit(t *testing.T) { "service_instance_id": testInstanceID, }, }, + "promhttp_metric_handler_errors_total": { + value: 0, + }, "target_info": { value: 0, labels: map[string]string{ @@ -189,12 +196,13 @@ func TestTelemetryInit(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { + metricsEndpoint := testutil.GetAvailableLocalAddressPrometheus(t) if tc.extendedConfig { tc.cfg.Metrics.Readers = []config.MetricReader{ { Pull: &config.PullMetricReader{ Exporter: config.MetricExporter{ - Prometheus: testutil.GetAvailableLocalAddressPrometheus(t), + Prometheus: metricsEndpoint, }, }, }, @@ -207,16 +215,16 @@ func TestTelemetryInit(t *testing.T) { }, Metrics: telemetry.MetricsConfig{ Level: configtelemetry.LevelDetailed, - Address: testutil.GetAvailableLocalAddress(t), + Address: fmt.Sprintf("%s:%d", *metricsEndpoint.Host, *metricsEndpoint.Port), }, } } - set := meterProviderSettings{ - res: resource.New(component.NewDefaultBuildInfo(), tc.cfg.Resource), - cfg: tc.cfg.Metrics, - asyncErrorChannel: make(chan error), + set := newNopSettings() + telset := telemetry.Settings{ + BuildInfo: set.BuildInfo, + ZapOptions: set.LoggingOptions, } - mp, err := newMeterProvider(set, tc.disableHighCard) + mp, err := telemetry.NewFactory().CreateMeterProvider(context.Background(), telset, tc.cfg) require.NoError(t, err) defer func() { if prov, ok := mp.(interface{ Shutdown(context.Context) error }); ok { @@ -225,8 +233,7 @@ func TestTelemetryInit(t *testing.T) { }() createTestMetrics(t, mp) - - metrics := getMetricsFromPrometheus(t, mp.(*meterProvider).servers[0].Handler) + metrics := getMetricsFromPrometheus(t, fmt.Sprintf("http://%s:%d/metrics", *metricsEndpoint.Host, *metricsEndpoint.Port)) require.Equal(t, len(tc.expectedMetrics), len(metrics)) for metricName, metricValue := range tc.expectedMetrics { @@ -262,13 +269,10 @@ func createTestMetrics(t *testing.T, mp metric.MeterProvider) { httpExampleCounter.Add(context.Background(), 10, metric.WithAttributes(proctelemetry.HTTPUnacceptableKeyValues...)) } -func getMetricsFromPrometheus(t *testing.T, handler http.Handler) map[string]*io_prometheus_client.MetricFamily { - req, err := http.NewRequest(http.MethodGet, "/metrics", nil) +func getMetricsFromPrometheus(t *testing.T, endpoint string) map[string]*io_prometheus_client.MetricFamily { + rr, err := http.Get(endpoint) require.NoError(t, err) - rr := httptest.NewRecorder() - handler.ServeHTTP(rr, req) - var parser expfmt.TextParser parsed, err := parser.TextToMetricFamilies(rr.Body) require.NoError(t, err)