diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md b/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md deleted file mode 100644 index 30931adaf0f..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# Storage Receiver - -`storagereceiver` is a fake receiver that creates an artificial stream of traces by: - -- repeatedly querying one of Jaeger storage backends for all traces (by service). -- tracking new traces / spans and passing them to the next component in the pipeline. - -# Getting Started - -The following settings are required: - -- `trace_storage` (no default): name of a storage backend defined in `jaegerstorage` extension - -The following settings can be optionally configured: - -- `pull_interval` (default = 0s): The delay between each iteration of pulling traces. - -```yaml -receivers: - jaeger_storage_receiver: - trace_storage: external-storage - pull_interval: 0s -``` diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go deleted file mode 100644 index e9319b8991d..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "time" - - "github.com/asaskevich/govalidator" -) - -type Config struct { - TraceStorage string `valid:"required" mapstructure:"trace_storage"` - PullInterval time.Duration `mapstructure:"pull_interval"` -} - -func (cfg *Config) Validate() error { - _, err := govalidator.ValidateStruct(cfg) - return err -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go deleted file mode 100644 index 98435b3e2cf..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "errors" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap/confmaptest" -) - -func TestLoadConfig(t *testing.T) { - t.Parallel() - - cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) - require.NoError(t, err) - - tests := []struct { - id component.ID - expected component.Config - expectedErr error - }{ - { - id: component.NewIDWithName(componentType, ""), - expectedErr: errors.New("non zero value required"), - }, - { - id: component.NewIDWithName(componentType, "defaults"), - expected: &Config{ - TraceStorage: "storage", - PullInterval: 0, - }, - }, - { - id: component.NewIDWithName(componentType, "filled"), - expected: &Config{ - TraceStorage: "storage", - PullInterval: 2 * time.Second, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.id.String(), func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - sub, err := cm.Sub(tt.id.String()) - require.NoError(t, err) - require.NoError(t, sub.Unmarshal(cfg)) - - if tt.expectedErr != nil { - require.ErrorContains(t, component.ValidateConfig(cfg), tt.expectedErr.Error()) - } else { - require.NoError(t, component.ValidateConfig(cfg)) - assert.Equal(t, tt.expected, cfg) - } - }) - } -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go deleted file mode 100644 index 09abc498e72..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" -) - -// componentType is the name of this extension in configuration. -var componentType = component.MustNewType("jaeger_storage_receiver") - -// ID is the identifier of this extension. -var ID = component.NewID(componentType) - -func NewFactory() receiver.Factory { - return receiver.NewFactory( - componentType, - createDefaultConfig, - receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), - ) -} - -func createDefaultConfig() component.Config { - return &Config{} -} - -func createTracesReceiver(_ context.Context, set receiver.Settings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { - cfg := config.(*Config) - - return newTracesReceiver(cfg, set, nextConsumer) -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go deleted file mode 100644 index 8b94fa2304b..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/receiver/receivertest" -) - -func TestCreateDefaultConfig(t *testing.T) { - cfg := createDefaultConfig().(*Config) - require.NotNil(t, cfg, "failed to create default config") - require.NoError(t, componenttest.CheckConfigStruct(cfg)) -} - -func TestCreateTracesReceiver(t *testing.T) { - cfg := createDefaultConfig().(*Config) - f := NewFactory() - r, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, r) -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go deleted file mode 100644 index 4dbecd011d3..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "testing" - - "github.com/jaegertracing/jaeger/pkg/testutils" -) - -func TestMain(m *testing.M) { - testutils.VerifyGoLeaks(m) -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go deleted file mode 100644 index d6f93ef5ccd..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - "fmt" - "time" - - jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" - "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/storage/spanstore" -) - -type storageReceiver struct { - cancelConsumeLoop context.CancelFunc - config *Config - settings receiver.Settings - consumedTraces map[model.TraceID]*consumedTrace - nextConsumer consumer.Traces - spanReader spanstore.Reader -} - -type consumedTrace struct { - spanIDs map[model.SpanID]struct{} -} - -func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Traces) (*storageReceiver, error) { - return &storageReceiver{ - config: config, - settings: set, - consumedTraces: make(map[model.TraceID]*consumedTrace), - nextConsumer: nextConsumer, - }, nil -} - -func (r *storageReceiver) Start(ctx context.Context, host component.Host) error { - f, err := jaegerstorage.GetStorageFactory(r.config.TraceStorage, host) - if err != nil { - return fmt.Errorf("cannot find storage factory: %w", err) - } - - if r.spanReader, err = f.CreateSpanReader(); err != nil { - return fmt.Errorf("cannot create span reader: %w", err) - } - - ctx, cancel := context.WithCancel(ctx) - r.cancelConsumeLoop = cancel - - go func() { - if err := r.consumeLoop(ctx); err != nil { - r.settings.ReportStatus(component.NewFatalErrorEvent(err)) - } - }() - - return nil -} - -func (r *storageReceiver) consumeLoop(ctx context.Context) error { - for { - services, err := r.spanReader.GetServices(ctx) - if err != nil { - r.settings.Logger.Error("Failed to get services from consumer", zap.Error(err)) - return err - } - - for _, svc := range services { - if err := r.consumeTraces(ctx, svc); err != nil { - r.settings.Logger.Error("Failed to consume traces from consumer", zap.Error(err)) - } - } - - select { - case <-ctx.Done(): - r.settings.Logger.Info("Consumer stopped") - return nil - default: - time.Sleep(r.config.PullInterval) - } - } -} - -func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error { - endTime := time.Now() - traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{ - ServiceName: serviceName, - StartTimeMin: endTime.Add(-1 * time.Hour), - StartTimeMax: endTime, - }) - if err != nil { - return err - } - - for _, trace := range traces { - traceID := trace.Spans[0].TraceID - if _, ok := r.consumedTraces[traceID]; !ok { - r.consumedTraces[traceID] = &consumedTrace{ - spanIDs: make(map[model.SpanID]struct{}), - } - } - r.consumeSpans(ctx, r.consumedTraces[traceID], trace.Spans) - } - - return nil -} - -func (r *storageReceiver) consumeSpans(ctx context.Context, tc *consumedTrace, spans []*model.Span) error { - // Spans are consumed one at a time because we don't know whether all spans - // in a trace have been completely exported - for _, span := range spans { - if _, ok := tc.spanIDs[span.SpanID]; !ok { - tc.spanIDs[span.SpanID] = struct{}{} - td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ - { - Spans: []*model.Span{span}, - Process: span.Process, - }, - }) - if err != nil { - return err - } - r.nextConsumer.ConsumeTraces(ctx, td) - } - } - - return nil -} - -func (r *storageReceiver) Shutdown(_ context.Context) error { - if r.cancelConsumeLoop != nil { - r.cancelConsumeLoop() - } - return nil -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml b/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml deleted file mode 100644 index e590e8f1694..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml +++ /dev/null @@ -1,6 +0,0 @@ -jaeger_storage_receiver: -jaeger_storage_receiver/defaults: - trace_storage: storage -jaeger_storage_receiver/filled: - trace_storage: storage - pull_interval: 2s