Skip to content

Commit

Permalink
support-badger-in-jaeger-v2
Browse files Browse the repository at this point in the history
Signed-off-by: Harshvir Potpose <hpotpose62@gmail.com>
  • Loading branch information
akagami-harsh committed Jan 21, 2024
1 parent 6662e1c commit 522e0ca
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 4 deletions.
41 changes: 41 additions & 0 deletions cmd/jaeger/badger_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: memstore
trace_storage_archive: memstore_archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
badger_primary:
memstore:
directory_key: "/tmp/jaeger/"
directory_value: "/tmp/jaeger/"
ephemeral: false
maintenance_interval: 5
metrics_update_interval: 10
memstore_archive:
directory_key: "/tmp/jaeger_archive/"
directory_value: "/tmp/jaeger_archive/"
ephemeral: false
maintenance_interval: 5
metrics_update_interval: 10

receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: memstore
1 change: 1 addition & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FIXME
19 changes: 19 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"github.com/asaskevich/govalidator"

badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
)

type Config struct {
Badger badgerCfg.NamespaceConfig `mapstructure:"badger"`
}

func (cfg *Config) Validate() error {
_, err := govalidator.ValidateStruct(cfg)
return err
}
32 changes: 32 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const componentType = component.Type("jaeger_storage_receiver")

func NewFactory() receiver.Factory {
return receiver.NewFactory(
componentType,
createDefaultConfig,
receiver.WithTraces(createTraces, component.StabilityLevelDevelopment),
)
}

func createDefaultConfig() component.Config {
return &Config{}
}

func createTraces(ctx context.Context, set receiver.CreateSettings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) {
cfg := config.(*Config)

return newReceiver(cfg, set.TelemetrySettings, nextConsumer)
}
135 changes: 135 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package storagereceiver

import (
"context"
"fmt"

jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type storageReceiver struct {
cancelConsumeLoop context.CancelFunc
config *Config
logger *zap.Logger
consumedTraces map[model.TraceID]*consumedTrace
nextConsumer consumer.Traces
spanReader spanstore.Reader
}

type consumedTrace struct {
spanIDs map[model.SpanID]struct{}
}

func newReceiver(config *Config, otel component.TelemetrySettings, nextConsumer consumer.Traces) (*storageReceiver, error) {
f, err := badger.NewFactoryWithConfig(
config.Badger,
metrics.NullFactory,
otel.Logger,
)
if err != nil {
return nil, fmt.Errorf("failed to init storage factory: %w", err)
}

spanReader, err := f.CreateSpanReader()
if err != nil {
return nil, fmt.Errorf("failed to create span reader: %w", err)
}

return &storageReceiver{
config: config,
logger: otel.Logger,
consumedTraces: make(map[model.TraceID]*consumedTrace),
nextConsumer: nextConsumer,
spanReader: spanReader,
}, nil
}

func (r *storageReceiver) Start(_ context.Context, host component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
r.cancelConsumeLoop = cancel

go func() {
if err := r.consumeLoop(ctx); err != nil {
host.ReportFatalError(err)
}
}()

return nil
}

func (r *storageReceiver) consumeLoop(ctx context.Context) error {
services := []string{"", "customers", "OTLPResourceNoServiceName"}

for {
for _, svc := range services {
if err := r.consumeTraces(ctx, svc); err != nil {
r.logger.Error("Error from consumer", zap.Error(err))
}
if ctx.Err() != nil {
r.logger.Error("Consumer stopped", zap.Error(ctx.Err()))
return ctx.Err()
}
}
}
}

func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error {
traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{
ServiceName: serviceName,
})
if err != nil {
return err
}

cnt := 0
for _, trace := range traces {
cnt += len(trace.Spans)
traceID := trace.Spans[0].TraceID
if _, ok := r.consumedTraces[traceID]; !ok {
r.consumedTraces[traceID] = &consumedTrace{
spanIDs: make(map[model.SpanID]struct{}),
}
}
if len(trace.Spans) > len(r.consumedTraces[traceID].spanIDs) {
r.consumeSpans(ctx, r.consumedTraces[traceID], trace.Spans)
}
}

return nil
}

func (r *storageReceiver) consumeSpans(ctx context.Context, tc *consumedTrace, spans []*model.Span) error {
for _, span := range spans {
if _, ok := tc.spanIDs[span.SpanID]; !ok {
tc.spanIDs[span.SpanID] = struct{}{}
td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{
{
Spans: []*model.Span{span},
Process: span.Process,
},
})
if err != nil {
return err
}
r.nextConsumer.ConsumeTraces(ctx, td)
}
}

return nil
}

func (r *storageReceiver) Shutdown(_ context.Context) error {
r.cancelConsumeLoop()
return nil
}
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Command() *cobra.Command {

settings := otelcol.CollectorSettings{
BuildInfo: info,
Factories: components,
Factories: Components,
}

cmd := otelcol.NewCommand(settings)
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ func (b builders) build() (otelcol.Factories, error) {
return factories, nil
}

func components() (otelcol.Factories, error) {
func Components() (otelcol.Factories, error) {
return defaultBuilders().build()
}
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func TestComponents(t *testing.T) {
factories, err := components()
factories, err := Components()

require.NoError(t, err)

Expand Down
9 changes: 8 additions & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"reflect"

memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger_primary"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand All @@ -23,6 +25,11 @@ type MemoryStorage struct {
memoryCfg.Configuration
}

type BadgerStorage struct {
Name string `mapstructure:"name"`
badgerCfg.NamespaceConfig
}

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
if reflect.DeepEqual(*cfg, *emptyCfg) {
Expand Down
17 changes: 17 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -70,6 +71,22 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
s.logger.With(zap.String("storage_name", name)),
)
}

for name, b := range s.config.Badger {
if _, ok := s.factories[name]; ok {
return fmt.Errorf("duplicate badger storage name %s", name)
}
var err error
s.factories[name], err = badger.NewFactoryWithConfig(
b,
metrics.NullFactory,
s.logger.With(zap.String("storage_name", name)),
)
if err != nil {
return fmt.Errorf("failed to initialize badger storage: %w", err)
}
}

// TODO add support for other backends
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ func NewFactory() *Factory {
}
}

func NewFactoryWithConfig(
cfg NamespaceConfig,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.InitFromOptions(Options{Primary: cfg})
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.Options.AddFlags(flagSet)
Expand Down

0 comments on commit 522e0ca

Please sign in to comment.