diff --git a/cmd/jaeger/badger_config.yaml b/cmd/jaeger/badger_config.yaml new file mode 100644 index 00000000000..08982305e2a --- /dev/null +++ b/cmd/jaeger/badger_config.yaml @@ -0,0 +1,41 @@ +service: + extensions: [jaeger_storage, jaeger_query] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger_storage_exporter] + +extensions: + jaeger_query: + trace_storage: memstore + trace_storage_archive: memstore_archive + ui_config: ./cmd/jaeger/config-ui.json + + jaeger_storage: + badger_primary: + memstore: + directory_key: "/tmp/jaeger/" + directory_value: "/tmp/jaeger/" + ephemeral: false + maintenance_interval: 5 + metrics_update_interval: 10 + memstore_archive: + directory_key: "/tmp/jaeger_archive/" + directory_value: "/tmp/jaeger_archive/" + ephemeral: false + maintenance_interval: 5 + metrics_update_interval: 10 + +receivers: + otlp: + protocols: + grpc: + http: + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: memstore diff --git a/cmd/jaeger/integration/receivers/storagereceiver/.nocover b/cmd/jaeger/integration/receivers/storagereceiver/.nocover new file mode 100644 index 00000000000..4c28ea5d152 --- /dev/null +++ b/cmd/jaeger/integration/receivers/storagereceiver/.nocover @@ -0,0 +1 @@ +FIXME \ No newline at end of file diff --git a/cmd/jaeger/integration/receivers/storagereceiver/config.go b/cmd/jaeger/integration/receivers/storagereceiver/config.go new file mode 100644 index 00000000000..a28b3b38f43 --- /dev/null +++ b/cmd/jaeger/integration/receivers/storagereceiver/config.go @@ -0,0 +1,19 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package storagereceiver + +import ( + "github.com/asaskevich/govalidator" + + badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" +) + +type Config struct { + Badger badgerCfg.NamespaceConfig `mapstructure:"badger"` +} + +func (cfg *Config) Validate() error { + _, err := govalidator.ValidateStruct(cfg) + return err +} diff --git a/cmd/jaeger/integration/receivers/storagereceiver/factory.go b/cmd/jaeger/integration/receivers/storagereceiver/factory.go new file mode 100644 index 00000000000..5fad34c4eba --- /dev/null +++ b/cmd/jaeger/integration/receivers/storagereceiver/factory.go @@ -0,0 +1,32 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package storagereceiver + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" +) + +const componentType = component.Type("jaeger_storage_receiver") + +func NewFactory() receiver.Factory { + return receiver.NewFactory( + componentType, + createDefaultConfig, + receiver.WithTraces(createTraces, component.StabilityLevelDevelopment), + ) +} + +func createDefaultConfig() component.Config { + return &Config{} +} + +func createTraces(ctx context.Context, set receiver.CreateSettings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { + cfg := config.(*Config) + + return newReceiver(cfg, set.TelemetrySettings, nextConsumer) +} diff --git a/cmd/jaeger/integration/receivers/storagereceiver/receiver.go b/cmd/jaeger/integration/receivers/storagereceiver/receiver.go new file mode 100644 index 00000000000..e5c6eef7704 --- /dev/null +++ b/cmd/jaeger/integration/receivers/storagereceiver/receiver.go @@ -0,0 +1,135 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package storagereceiver + +import ( + "context" + "fmt" + + jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/storage/badger" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +type storageReceiver struct { + cancelConsumeLoop context.CancelFunc + config *Config + logger *zap.Logger + consumedTraces map[model.TraceID]*consumedTrace + nextConsumer consumer.Traces + spanReader spanstore.Reader +} + +type consumedTrace struct { + spanIDs map[model.SpanID]struct{} +} + +func newReceiver(config *Config, otel component.TelemetrySettings, nextConsumer consumer.Traces) (*storageReceiver, error) { + f, err := badger.NewFactoryWithConfig( + config.Badger, + metrics.NullFactory, + otel.Logger, + ) + if err != nil { + return nil, fmt.Errorf("failed to init storage factory: %w", err) + } + + spanReader, err := f.CreateSpanReader() + if err != nil { + return nil, fmt.Errorf("failed to create span reader: %w", err) + } + + return &storageReceiver{ + config: config, + logger: otel.Logger, + consumedTraces: make(map[model.TraceID]*consumedTrace), + nextConsumer: nextConsumer, + spanReader: spanReader, + }, nil +} + +func (r *storageReceiver) Start(_ context.Context, host component.Host) error { + ctx, cancel := context.WithCancel(context.Background()) + r.cancelConsumeLoop = cancel + + go func() { + if err := r.consumeLoop(ctx); err != nil { + host.ReportFatalError(err) + } + }() + + return nil +} + +func (r *storageReceiver) consumeLoop(ctx context.Context) error { + services := []string{"", "customers", "OTLPResourceNoServiceName"} + + for { + for _, svc := range services { + if err := r.consumeTraces(ctx, svc); err != nil { + r.logger.Error("Error from consumer", zap.Error(err)) + } + if ctx.Err() != nil { + r.logger.Error("Consumer stopped", zap.Error(ctx.Err())) + return ctx.Err() + } + } + } +} + +func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error { + traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{ + ServiceName: serviceName, + }) + if err != nil { + return err + } + + cnt := 0 + for _, trace := range traces { + cnt += len(trace.Spans) + traceID := trace.Spans[0].TraceID + if _, ok := r.consumedTraces[traceID]; !ok { + r.consumedTraces[traceID] = &consumedTrace{ + spanIDs: make(map[model.SpanID]struct{}), + } + } + if len(trace.Spans) > len(r.consumedTraces[traceID].spanIDs) { + r.consumeSpans(ctx, r.consumedTraces[traceID], trace.Spans) + } + } + + return nil +} + +func (r *storageReceiver) consumeSpans(ctx context.Context, tc *consumedTrace, spans []*model.Span) error { + for _, span := range spans { + if _, ok := tc.spanIDs[span.SpanID]; !ok { + tc.spanIDs[span.SpanID] = struct{}{} + td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ + { + Spans: []*model.Span{span}, + Process: span.Process, + }, + }) + if err != nil { + return err + } + r.nextConsumer.ConsumeTraces(ctx, td) + } + } + + return nil +} + +func (r *storageReceiver) Shutdown(_ context.Context) error { + r.cancelConsumeLoop() + return nil +} diff --git a/cmd/jaeger/internal/command.go b/cmd/jaeger/internal/command.go index 072a05e8cc1..b53e22e4307 100644 --- a/cmd/jaeger/internal/command.go +++ b/cmd/jaeger/internal/command.go @@ -29,7 +29,7 @@ func Command() *cobra.Command { settings := otelcol.CollectorSettings{ BuildInfo: info, - Factories: components, + Factories: Components, } cmd := otelcol.NewCommand(settings) diff --git a/cmd/jaeger/internal/components.go b/cmd/jaeger/internal/components.go index 2b758a55990..d98582023ba 100644 --- a/cmd/jaeger/internal/components.go +++ b/cmd/jaeger/internal/components.go @@ -116,6 +116,6 @@ func (b builders) build() (otelcol.Factories, error) { return factories, nil } -func components() (otelcol.Factories, error) { +func Components() (otelcol.Factories, error) { return defaultBuilders().build() } diff --git a/cmd/jaeger/internal/components_test.go b/cmd/jaeger/internal/components_test.go index fed360d0bdd..fd333453691 100644 --- a/cmd/jaeger/internal/components_test.go +++ b/cmd/jaeger/internal/components_test.go @@ -29,7 +29,7 @@ import ( ) func TestComponents(t *testing.T) { - factories, err := components() + factories, err := Components() require.NoError(t, err) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index ff97439a682..5c60869d9bc 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -8,11 +8,13 @@ import ( "reflect" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" + badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" ) // Config has the configuration for jaeger-query, type Config struct { - Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` + Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` + Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger_primary"` // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? // Option: instead of looking for specific name, check interface. @@ -23,6 +25,11 @@ type MemoryStorage struct { memoryCfg.Configuration } +type BadgerStorage struct { + Name string `mapstructure:"name"` + badgerCfg.NamespaceConfig +} + func (cfg *Config) Validate() error { emptyCfg := createDefaultConfig().(*Config) if reflect.DeepEqual(*cfg, *emptyCfg) { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 60db1d9ec9e..ba92bdfa6f8 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -14,6 +14,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" ) @@ -70,6 +71,22 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { s.logger.With(zap.String("storage_name", name)), ) } + + for name, b := range s.config.Badger { + if _, ok := s.factories[name]; ok { + return fmt.Errorf("duplicate badger storage name %s", name) + } + var err error + s.factories[name], err = badger.NewFactoryWithConfig( + b, + metrics.NullFactory, + s.logger.With(zap.String("storage_name", name)), + ) + if err != nil { + return fmt.Errorf("failed to initialize badger storage: %w", err) + } + } + // TODO add support for other backends return nil } diff --git a/plugin/storage/badger/factory.go b/plugin/storage/badger/factory.go index f48c163bc5c..5b2d10b1278 100644 --- a/plugin/storage/badger/factory.go +++ b/plugin/storage/badger/factory.go @@ -91,6 +91,20 @@ func NewFactory() *Factory { } } +func NewFactoryWithConfig( + cfg NamespaceConfig, + metricsFactory metrics.Factory, + logger *zap.Logger, +) (*Factory, error) { + f := NewFactory() + f.InitFromOptions(Options{Primary: cfg}) + err := f.Initialize(metricsFactory, logger) + if err != nil { + return nil, err + } + return f, nil +} + // AddFlags implements plugin.Configurable func (f *Factory) AddFlags(flagSet *flag.FlagSet) { f.Options.AddFlags(flagSet)