Skip to content

Commit

Permalink
[jaeger-v2] Add support for GRPC storarge (#5228)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #4843
- Separate GRPC storage PR from and will be used by jaeger-v2 Kafka PR
#4971

## Description of the changes
- Implement GRPC storage backend for Jaeger-V2 storage

## How was this change tested?
- Run two `jaegertracing/jaeger-remote-storage` at `17271` and `17281`
ports
- Execute `go run -tags=ui ./cmd/jaeger --config
./cmd/jaeger/grpc_config.yaml`

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: James Ryans <james.ryans2012@gmail.com>
  • Loading branch information
james-ryans committed Feb 27, 2024
1 parent 0c53139 commit 751efdb
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 5 deletions.
35 changes: 35 additions & 0 deletions cmd/jaeger/grpc_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: external-storage
trace_storage_archive: external-storage-archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
grpc:
external-storage:
server: localhost:17271
connection-timeout: 5s
external-storage-archive:
server: localhost:17281
connection-timeout: 5s

receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: external-storage
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
Expand All @@ -29,6 +31,7 @@ type MemoryStorage struct {

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
//nolint:govet // The remoteRPCClient field in GRPC.Configuration contains error type
if reflect.DeepEqual(*cfg, *emptyCfg) {
return fmt.Errorf("%s: no storage type present in config", ID)
} else {
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -109,6 +111,12 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{
ext: s,
storageKind: "grpc",
cfg: s.config.GRPC,
builder: grpc.NewFactoryWithConfig,
}
esStarter := &starter[esCfg.Configuration, *es.Factory]{
ext: s,
storageKind: "elasticsearch",
Expand All @@ -119,6 +127,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
grpcStarter.build,
esStarter.build,
// TODO add support for other backends
}
Expand Down
9 changes: 7 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Configuration struct {
pluginHealthCheck *time.Ticker
pluginHealthCheckDone chan bool
pluginRPCClient plugin.ClientProtocol
remoteConn *grpc.ClientConn
}

// ClientPluginServices defines services plugin can expose and its capabilities
Expand Down Expand Up @@ -78,6 +79,9 @@ func (c *Configuration) Close() error {
c.pluginHealthCheck.Stop()
c.pluginHealthCheckDone <- true
}
if c.remoteConn != nil {
c.remoteConn.Close()
}

return c.RemoteTLS.Close()
}
Expand Down Expand Up @@ -106,12 +110,13 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
var err error
c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
if err != nil {
return nil, fmt.Errorf("error connecting to remote storage: %w", err)
}

grpcClient := shared.NewGRPCClient(conn)
grpcClient := shared.NewGRPCClient(c.remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
Expand Down
15 changes: 15 additions & 0 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ func NewFactory() *Factory {
return &Factory{}
}

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.InitFromOptions(Options{Configuration: cfg})
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.options.AddFlags(flagSet)
Expand Down
27 changes: 27 additions & 0 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package grpc

import (
"errors"
"log"
"net"
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -143,6 +147,29 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCStorageFactoryWithConfig(t *testing.T) {
cfg := grpcConfig.Configuration{}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage")

lis, err := net.Listen("tcp", ":0")
require.NoError(t, err, "failed to listen")

s := grpc.NewServer()
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()
defer s.Stop()

cfg.RemoteServerAddr = lis.Addr().String()
cfg.RemoteConnectTimeout = 1 * time.Second
f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
require.NoError(t, f.Close())
}

func TestGRPCStorageFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
Expand Down
11 changes: 8 additions & 3 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func (s *gRPCServer) Restart() error {

type GRPCStorageIntegrationTestSuite struct {
StorageIntegration
logger *zap.Logger
flags []string
server *gRPCServer
logger *zap.Logger
flags []string
factory *grpc.Factory
server *gRPCServer
}

func (s *GRPCStorageIntegrationTestSuite) initialize() error {
Expand All @@ -120,6 +121,7 @@ func (s *GRPCStorageIntegrationTestSuite) initialize() error {
if err := f.Initialize(metrics.NullFactory, s.logger); err != nil {
return err
}
s.factory = f

if s.SpanWriter, err = f.CreateSpanWriter(); err != nil {
return err
Expand All @@ -140,6 +142,9 @@ func (s *GRPCStorageIntegrationTestSuite) refresh() error {
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp() error {
if err := s.factory.Close(); err != nil {
return err
}
return s.initialize()
}

Expand Down

0 comments on commit 751efdb

Please sign in to comment.