From ee39180bbd6876a28415301e199281387c79bd1d Mon Sep 17 00:00:00 2001 From: James Ryans Date: Sat, 24 Feb 2024 17:32:28 +0700 Subject: [PATCH 1/7] feat: add support for grpc in jaeger-v2 Signed-off-by: James Ryans --- cmd/jaeger/grpc_config.yaml | 35 +++++++++++++++++++ .../extension/jaegerstorage/config.go | 2 ++ .../extension/jaegerstorage/extension.go | 9 +++++ plugin/storage/grpc/factory.go | 15 ++++++++ plugin/storage/grpc/factory_test.go | 6 ++++ 5 files changed, 67 insertions(+) create mode 100644 cmd/jaeger/grpc_config.yaml diff --git a/cmd/jaeger/grpc_config.yaml b/cmd/jaeger/grpc_config.yaml new file mode 100644 index 00000000000..c982b796646 --- /dev/null +++ b/cmd/jaeger/grpc_config.yaml @@ -0,0 +1,35 @@ +service: + extensions: [jaeger_storage, jaeger_query] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger_storage_exporter] + +extensions: + jaeger_query: + trace_storage: external-storage + trace_storage_archive: external-storage-archive + ui_config: ./cmd/jaeger/config-ui.json + + jaeger_storage: + grpc: + external-storage: + server: localhost:17271 + connection-timeout: 5s + external-storage-archive: + server: localhost:17281 + connection-timeout: 5s + +receivers: + otlp: + protocols: + grpc: + http: + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: external-storage diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index db32f6c79cd..f0d71053dce 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -9,12 +9,14 @@ import ( memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" + grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" ) // Config has the configuration for jaeger-query, type Config struct { Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` + GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? // Option: instead of looking for specific name, check interface. diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index a1ac028e772..192aab2c6f1 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -17,6 +17,8 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/badger" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" + "github.com/jaegertracing/jaeger/plugin/storage/grpc" + grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" ) @@ -107,10 +109,17 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { cfg: s.config.Badger, builder: badger.NewFactoryWithConfig, } + grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{ + ext: s, + storageKind: "grpc", + cfg: s.config.GRPC, + builder: grpc.NewFactoryWithConfig, + } builders := []func(ctx context.Context, host component.Host) error{ memStarter.build, badgerStarter.build, + grpcStarter.build, // TODO add support for other backends } for _, builder := range builders { diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index c4656c36ab4..8a9d360018a 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -60,6 +60,21 @@ func NewFactory() *Factory { return &Factory{} } +// NewFactoryWithConfig is used from jaeger(v2). +func NewFactoryWithConfig( + cfg config.Configuration, + metricsFactory metrics.Factory, + logger *zap.Logger, +) (*Factory, error) { + f := NewFactory() + f.InitFromOptions(Options{Configuration: cfg}) + err := f.Initialize(metricsFactory, logger) + if err != nil { + return nil, err + } + return f, nil +} + // AddFlags implements plugin.Configurable func (f *Factory) AddFlags(flagSet *flag.FlagSet) { f.options.AddFlags(flagSet) diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index e9b4417383b..a28553323fc 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -143,6 +143,12 @@ func TestGRPCStorageFactory(t *testing.T) { assert.Equal(t, f.store.DependencyReader(), depReader) } +func TestGRPCStorageFactoryWithConfig(t *testing.T) { + cfg := grpcConfig.Configuration{} + _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) + require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage") +} + func TestGRPCStorageFactory_Capabilities(t *testing.T) { f := NewFactory() v := viper.New() From 2798a45217bc0f45530169f51582a1e6e97a7b61 Mon Sep 17 00:00:00 2001 From: James Ryans Date: Sat, 24 Feb 2024 21:11:31 +0700 Subject: [PATCH 2/7] test: cover a success NewFactoryWithConfig line Signed-off-by: James Ryans --- .../extension/jaegerstorage/config.go | 1 + plugin/storage/grpc/config/config.go | 9 ++++-- plugin/storage/grpc/factory_test.go | 28 +++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index f0d71053dce..199d3c887c0 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -29,6 +29,7 @@ type MemoryStorage struct { func (cfg *Config) Validate() error { emptyCfg := createDefaultConfig().(*Config) + //nolint:govet // The remoteRPCClient field in GRPC.Configuration contains error type if reflect.DeepEqual(*cfg, *emptyCfg) { return fmt.Errorf("%s: no storage type present in config", ID) } else { diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index cd1f98ff917..717ddd69285 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -50,6 +50,7 @@ type Configuration struct { pluginHealthCheck *time.Ticker pluginHealthCheckDone chan bool pluginRPCClient plugin.ClientProtocol + remoteRPCClient *grpc.ClientConn } // ClientPluginServices defines services plugin can expose and its capabilities @@ -78,6 +79,9 @@ func (c *Configuration) Close() error { c.pluginHealthCheck.Stop() c.pluginHealthCheckDone <- true } + if c.remoteRPCClient != nil { + c.remoteRPCClient.Close() + } return c.RemoteTLS.Close() } @@ -106,12 +110,13 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) } - conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...) + var err error + c.remoteRPCClient, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...) if err != nil { return nil, fmt.Errorf("error connecting to remote storage: %w", err) } - grpcClient := shared.NewGRPCClient(conn) + grpcClient := shared.NewGRPCClient(c.remoteRPCClient) return &ClientPluginServices{ PluginServices: shared.PluginServices{ Store: grpcClient, diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index a28553323fc..57008a7174a 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -16,13 +16,17 @@ package grpc import ( "errors" + "log" + "net" "testing" + "time" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "google.golang.org/grpc" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -147,6 +151,30 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) { cfg := grpcConfig.Configuration{} _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage") + + lis, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + + s := grpc.NewServer() + go func() { + if err := s.Serve(lis); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() + defer s.Stop() + + cfg.RemoteServerAddr = lis.Addr().String() + cfg.RemoteConnectTimeout = 1 * time.Second + f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) + defer func() { + err := f.Close() + if err != nil { + log.Fatalf("Client exited with error: %v", err) + } + }() + require.NoError(t, err) } func TestGRPCStorageFactory_Capabilities(t *testing.T) { From 709bb092d067cdb6b828c60a49a326e7d56bad6c Mon Sep 17 00:00:00 2001 From: James Ryans Date: Tue, 27 Feb 2024 21:06:46 +0700 Subject: [PATCH 3/7] refactor remoteConn naming and factory_test require.NoError() Signed-off-by: James Ryans --- plugin/storage/grpc/config/config.go | 10 +++++----- plugin/storage/grpc/factory_test.go | 4 +--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 717ddd69285..169b143c249 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -50,7 +50,7 @@ type Configuration struct { pluginHealthCheck *time.Ticker pluginHealthCheckDone chan bool pluginRPCClient plugin.ClientProtocol - remoteRPCClient *grpc.ClientConn + remoteConn *grpc.ClientConn } // ClientPluginServices defines services plugin can expose and its capabilities @@ -79,8 +79,8 @@ func (c *Configuration) Close() error { c.pluginHealthCheck.Stop() c.pluginHealthCheckDone <- true } - if c.remoteRPCClient != nil { - c.remoteRPCClient.Close() + if c.remoteConn != nil { + c.remoteConn.Close() } return c.RemoteTLS.Close() @@ -111,12 +111,12 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) } var err error - c.remoteRPCClient, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...) + c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...) if err != nil { return nil, fmt.Errorf("error connecting to remote storage: %w", err) } - grpcClient := shared.NewGRPCClient(c.remoteRPCClient) + grpcClient := shared.NewGRPCClient(c.remoteConn) return &ClientPluginServices{ PluginServices: shared.PluginServices{ Store: grpcClient, diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index 57008a7174a..95520dd7ba0 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -153,9 +153,7 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) { require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage") lis, err := net.Listen("tcp", ":0") - if err != nil { - t.Fatalf("failed to listen: %v", err) - } + require.NoError(t, err, "failed to listen") s := grpc.NewServer() go func() { From f50daec287496d6260b78ad8c2ce6900158cddad Mon Sep 17 00:00:00 2001 From: James Ryans Date: Tue, 27 Feb 2024 21:07:40 +0700 Subject: [PATCH 4/7] add coverage to grpc.Configuration Close() from integration test Signed-off-by: James Ryans --- plugin/storage/integration/grpc_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 5b6504561cb..226d86f5b9f 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -96,14 +96,20 @@ func (s *gRPCServer) Restart() error { type GRPCStorageIntegrationTestSuite struct { StorageIntegration - logger *zap.Logger - flags []string - server *gRPCServer + logger *zap.Logger + flags []string + factory *grpc.Factory + server *gRPCServer } func (s *GRPCStorageIntegrationTestSuite) initialize() error { s.logger, _ = testutils.NewLogger() + if s.factory != nil { + if err := s.factory.Close(); err != nil { + return err + } + } if s.server != nil { if err := s.server.Restart(); err != nil { return err @@ -120,6 +126,7 @@ func (s *GRPCStorageIntegrationTestSuite) initialize() error { if err := f.Initialize(metrics.NullFactory, s.logger); err != nil { return err } + s.factory = f if s.SpanWriter, err = f.CreateSpanWriter(); err != nil { return err From 05d2c05e6713f5ced4a87a094bdcea391a33d6e1 Mon Sep 17 00:00:00 2001 From: James Ryans Date: Tue, 27 Feb 2024 21:38:06 +0700 Subject: [PATCH 5/7] fix unsaved merge conflict in jaeger storage config Signed-off-by: James Ryans --- cmd/jaeger/internal/extension/jaegerstorage/config.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index c2f3400b3d7..87464ef9311 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -15,15 +15,10 @@ import ( // Config has the configuration for jaeger-query, type Config struct { -<<<<<<< HEAD - Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` - Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` - GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` -======= Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` + GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` ->>>>>>> main // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? // Option: instead of looking for specific name, check interface. From 992b22374d03ff7794b2dfe4f376abd7c2663d8f Mon Sep 17 00:00:00 2001 From: James Ryans Date: Tue, 27 Feb 2024 23:01:32 +0700 Subject: [PATCH 6/7] refactor factory close from grpc_test to clean-up Signed-off-by: James Ryans --- plugin/storage/integration/grpc_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 226d86f5b9f..eb8083cb800 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -105,11 +105,6 @@ type GRPCStorageIntegrationTestSuite struct { func (s *GRPCStorageIntegrationTestSuite) initialize() error { s.logger, _ = testutils.NewLogger() - if s.factory != nil { - if err := s.factory.Close(); err != nil { - return err - } - } if s.server != nil { if err := s.server.Restart(); err != nil { return err @@ -147,6 +142,9 @@ func (s *GRPCStorageIntegrationTestSuite) refresh() error { } func (s *GRPCStorageIntegrationTestSuite) cleanUp() error { + if err := s.factory.Close(); err != nil { + return err + } return s.initialize() } From cd405efb16ed8eeb0309b31e5ac16b0862fa77b0 Mon Sep 17 00:00:00 2001 From: James Ryans Date: Tue, 27 Feb 2024 23:08:58 +0700 Subject: [PATCH 7/7] refactor grpc factory_test Signed-off-by: James Ryans --- plugin/storage/grpc/factory_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index 95520dd7ba0..62699eb2e2a 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -166,13 +166,8 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) { cfg.RemoteServerAddr = lis.Addr().String() cfg.RemoteConnectTimeout = 1 * time.Second f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) - defer func() { - err := f.Close() - if err != nil { - log.Fatalf("Client exited with error: %v", err) - } - }() require.NoError(t, err) + require.NoError(t, f.Close()) } func TestGRPCStorageFactory_Capabilities(t *testing.T) {