From 751efdb89a5640b2fd525c4ed5a5a2bb596a6ae2 Mon Sep 17 00:00:00 2001 From: James Ryans <46216691+james-ryans@users.noreply.github.com> Date: Tue, 27 Feb 2024 23:43:02 +0700 Subject: [PATCH] [jaeger-v2] Add support for GRPC storarge (#5228) ## Which problem is this PR solving? - Part of #4843 - Separate GRPC storage PR from and will be used by jaeger-v2 Kafka PR #4971 ## Description of the changes - Implement GRPC storage backend for Jaeger-V2 storage ## How was this change tested? - Run two `jaegertracing/jaeger-remote-storage` at `17271` and `17281` ports - Execute `go run -tags=ui ./cmd/jaeger --config ./cmd/jaeger/grpc_config.yaml` ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: James Ryans --- cmd/jaeger/grpc_config.yaml | 35 +++++++++++++++++++ .../extension/jaegerstorage/config.go | 3 ++ .../extension/jaegerstorage/extension.go | 9 +++++ plugin/storage/grpc/config/config.go | 9 +++-- plugin/storage/grpc/factory.go | 15 ++++++++ plugin/storage/grpc/factory_test.go | 27 ++++++++++++++ plugin/storage/integration/grpc_test.go | 11 ++++-- 7 files changed, 104 insertions(+), 5 deletions(-) create mode 100644 cmd/jaeger/grpc_config.yaml diff --git a/cmd/jaeger/grpc_config.yaml b/cmd/jaeger/grpc_config.yaml new file mode 100644 index 00000000000..c982b796646 --- /dev/null +++ b/cmd/jaeger/grpc_config.yaml @@ -0,0 +1,35 @@ +service: + extensions: [jaeger_storage, jaeger_query] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger_storage_exporter] + +extensions: + jaeger_query: + trace_storage: external-storage + trace_storage_archive: external-storage-archive + ui_config: ./cmd/jaeger/config-ui.json + + jaeger_storage: + grpc: + external-storage: + server: localhost:17271 + connection-timeout: 5s + external-storage-archive: + server: localhost:17281 + connection-timeout: 5s + +receivers: + otlp: + protocols: + grpc: + http: + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: external-storage diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index 795331eab2a..87464ef9311 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -10,12 +10,14 @@ import ( esCfg "github.com/jaegertracing/jaeger/pkg/es/config" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" + grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" ) // Config has the configuration for jaeger-query, type Config struct { Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` + GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? @@ -29,6 +31,7 @@ type MemoryStorage struct { func (cfg *Config) Validate() error { emptyCfg := createDefaultConfig().(*Config) + //nolint:govet // The remoteRPCClient field in GRPC.Configuration contains error type if reflect.DeepEqual(*cfg, *emptyCfg) { return fmt.Errorf("%s: no storage type present in config", ID) } else { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index fcecd0c0229..0acf2e274c8 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -19,6 +19,8 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/badger" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/es" + "github.com/jaegertracing/jaeger/plugin/storage/grpc" + grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" ) @@ -109,6 +111,12 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { cfg: s.config.Badger, builder: badger.NewFactoryWithConfig, } + grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{ + ext: s, + storageKind: "grpc", + cfg: s.config.GRPC, + builder: grpc.NewFactoryWithConfig, + } esStarter := &starter[esCfg.Configuration, *es.Factory]{ ext: s, storageKind: "elasticsearch", @@ -119,6 +127,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { builders := []func(ctx context.Context, host component.Host) error{ memStarter.build, badgerStarter.build, + grpcStarter.build, esStarter.build, // TODO add support for other backends } diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index cd1f98ff917..169b143c249 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -50,6 +50,7 @@ type Configuration struct { pluginHealthCheck *time.Ticker pluginHealthCheckDone chan bool pluginRPCClient plugin.ClientProtocol + remoteConn *grpc.ClientConn } // ClientPluginServices defines services plugin can expose and its capabilities @@ -78,6 +79,9 @@ func (c *Configuration) Close() error { c.pluginHealthCheck.Stop() c.pluginHealthCheckDone <- true } + if c.remoteConn != nil { + c.remoteConn.Close() + } return c.RemoteTLS.Close() } @@ -106,12 +110,13 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) } - conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...) + var err error + c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...) if err != nil { return nil, fmt.Errorf("error connecting to remote storage: %w", err) } - grpcClient := shared.NewGRPCClient(conn) + grpcClient := shared.NewGRPCClient(c.remoteConn) return &ClientPluginServices{ PluginServices: shared.PluginServices{ Store: grpcClient, diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index c4656c36ab4..8a9d360018a 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -60,6 +60,21 @@ func NewFactory() *Factory { return &Factory{} } +// NewFactoryWithConfig is used from jaeger(v2). +func NewFactoryWithConfig( + cfg config.Configuration, + metricsFactory metrics.Factory, + logger *zap.Logger, +) (*Factory, error) { + f := NewFactory() + f.InitFromOptions(Options{Configuration: cfg}) + err := f.Initialize(metricsFactory, logger) + if err != nil { + return nil, err + } + return f, nil +} + // AddFlags implements plugin.Configurable func (f *Factory) AddFlags(flagSet *flag.FlagSet) { f.options.AddFlags(flagSet) diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index e9b4417383b..62699eb2e2a 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -16,13 +16,17 @@ package grpc import ( "errors" + "log" + "net" "testing" + "time" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "google.golang.org/grpc" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -143,6 +147,29 @@ func TestGRPCStorageFactory(t *testing.T) { assert.Equal(t, f.store.DependencyReader(), depReader) } +func TestGRPCStorageFactoryWithConfig(t *testing.T) { + cfg := grpcConfig.Configuration{} + _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) + require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage") + + lis, err := net.Listen("tcp", ":0") + require.NoError(t, err, "failed to listen") + + s := grpc.NewServer() + go func() { + if err := s.Serve(lis); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() + defer s.Stop() + + cfg.RemoteServerAddr = lis.Addr().String() + cfg.RemoteConnectTimeout = 1 * time.Second + f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) + require.NoError(t, err) + require.NoError(t, f.Close()) +} + func TestGRPCStorageFactory_Capabilities(t *testing.T) { f := NewFactory() v := viper.New() diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 5b6504561cb..eb8083cb800 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -96,9 +96,10 @@ func (s *gRPCServer) Restart() error { type GRPCStorageIntegrationTestSuite struct { StorageIntegration - logger *zap.Logger - flags []string - server *gRPCServer + logger *zap.Logger + flags []string + factory *grpc.Factory + server *gRPCServer } func (s *GRPCStorageIntegrationTestSuite) initialize() error { @@ -120,6 +121,7 @@ func (s *GRPCStorageIntegrationTestSuite) initialize() error { if err := f.Initialize(metrics.NullFactory, s.logger); err != nil { return err } + s.factory = f if s.SpanWriter, err = f.CreateSpanWriter(); err != nil { return err @@ -140,6 +142,9 @@ func (s *GRPCStorageIntegrationTestSuite) refresh() error { } func (s *GRPCStorageIntegrationTestSuite) cleanUp() error { + if err := s.factory.Close(); err != nil { + return err + } return s.initialize() }