From c840816c98222530b70b1cb37757937bd57722ed Mon Sep 17 00:00:00 2001 From: Harshvir Potpose <122517264+akagami-harsh@users.noreply.github.com> Date: Tue, 21 May 2024 01:30:43 +0530 Subject: [PATCH] Create new grpc storage configuration to align with OTEL (#5331) ## Which problem is this PR solving? - part of #5229 ## Description of the changes - added more grpc storage client configuration to align with otel ``` configgrpc.ClientConfig `mapstructure:",squash"` exporterhelper.TimeoutSettings `mapstructure:",squash"` ``` These are not all the configs, but i'll add more based on feedback on this initial approach. ## How was this change tested? - not tested yet ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Harshvir Potpose Signed-off-by: Harshvir Potpose <122517264+akagami-harsh@users.noreply.github.com> Signed-off-by: Yuri Shkuro Co-authored-by: Yuri Shkuro --- cmd/jaeger/config-remote-storage.yaml | 6 +- .../extension/jaegerstorage/config.go | 2 +- .../extension/jaegerstorage/extension.go | 2 +- pkg/config/tlscfg/options.go | 18 + pkg/config/tlscfg/options_test.go | 56 +++ plugin/storage/grpc/config/config.go | 82 +++-- plugin/storage/grpc/config/config_test.go | 9 +- plugin/storage/grpc/factory.go | 75 ++-- plugin/storage/grpc/factory_test.go | 334 ++++++++---------- plugin/storage/grpc/options.go | 19 +- plugin/storage/grpc/options_test.go | 43 +-- 11 files changed, 334 insertions(+), 312 deletions(-) diff --git a/cmd/jaeger/config-remote-storage.yaml b/cmd/jaeger/config-remote-storage.yaml index 0da873500bd..98a438bd779 100644 --- a/cmd/jaeger/config-remote-storage.yaml +++ b/cmd/jaeger/config-remote-storage.yaml @@ -14,8 +14,10 @@ extensions: jaeger_storage: grpc: external-storage: - server: localhost:17271 - connection-timeout: 5s + endpoint: localhost:17271 + timeout: 5s + tls: + insecure: true receivers: otlp: diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index 542c5a6437a..1e6128cda93 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -18,7 +18,7 @@ import ( type Config struct { Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` - GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` + GRPC map[string]grpcCfg.ConfigV2 `mapstructure:"grpc"` Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"` Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` Cassandra map[string]cassandra.Options `mapstructure:"cassandra"` diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index a5442b82ff1..23709c00d3b 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -117,7 +117,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { cfg: s.config.Badger, builder: badger.NewFactoryWithConfig, } - grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{ + grpcStarter := &starter[grpcCfg.ConfigV2, *grpc.Factory]{ ext: s, storageKind: "grpc", cfg: s.config.GRPC, diff --git a/pkg/config/tlscfg/options.go b/pkg/config/tlscfg/options.go index 2707887831d..68739e2470b 100644 --- a/pkg/config/tlscfg/options.go +++ b/pkg/config/tlscfg/options.go @@ -23,6 +23,7 @@ import ( "path/filepath" "time" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" ) @@ -135,6 +136,23 @@ func (p Options) loadCertPool() (*x509.CertPool, error) { return certPool, nil } +func (o *Options) ToOtelClientConfig() configtls.ClientConfig { + return configtls.ClientConfig{ + Insecure: !o.Enabled, + InsecureSkipVerify: o.SkipHostVerify, + ServerName: o.ServerName, + Config: configtls.Config{ + CAFile: o.CAPath, + CertFile: o.CertPath, + KeyFile: o.KeyPath, + CipherSuites: o.CipherSuites, + MinVersion: o.MinVersion, + MaxVersion: o.MaxVersion, + ReloadInterval: o.ReloadInterval, + }, + } +} + func addCertToPool(caPath string, certPool *x509.CertPool) error { caPEM, err := os.ReadFile(filepath.Clean(caPath)) if err != nil { diff --git a/pkg/config/tlscfg/options_test.go b/pkg/config/tlscfg/options_test.go index 81d146c5210..44ad205fdc0 100644 --- a/pkg/config/tlscfg/options_test.go +++ b/pkg/config/tlscfg/options_test.go @@ -20,9 +20,11 @@ import ( "fmt" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" ) @@ -186,3 +188,57 @@ func TestOptionsToConfig(t *testing.T) { }) } } + +func TestToOtelClientConfig(t *testing.T) { + testCases := []struct { + name string + options Options + expected configtls.ClientConfig + }{ + { + name: "insecure", + options: Options{ + Enabled: false, + }, + expected: configtls.ClientConfig{ + Insecure: true, + }, + }, + { + name: "secure with skip host verify", + options: Options{ + Enabled: true, + SkipHostVerify: true, + ServerName: "example.com", + CAPath: "path/to/ca.pem", + CertPath: "path/to/cert.pem", + KeyPath: "path/to/key.pem", + CipherSuites: []string{"TLS_RSA_WITH_AES_128_CBC_SHA"}, + MinVersion: "1.2", + MaxVersion: "1.3", + ReloadInterval: 24 * time.Hour, + }, + expected: configtls.ClientConfig{ + Insecure: false, + InsecureSkipVerify: true, + ServerName: "example.com", + Config: configtls.Config{ + CAFile: "path/to/ca.pem", + CertFile: "path/to/cert.pem", + KeyFile: "path/to/key.pem", + CipherSuites: []string{"TLS_RSA_WITH_AES_128_CBC_SHA"}, + MinVersion: "1.2", + MaxVersion: "1.3", + ReloadInterval: 24 * time.Hour, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := tc.options.ToOtelClientConfig() + assert.Equal(t, tc.expected, actual) + }) + } +} diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index d82c9779648..33b274a8873 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -15,16 +15,18 @@ package config import ( - "errors" + "context" "fmt" "time" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/tenancy" @@ -37,57 +39,66 @@ type Configuration struct { RemoteTLS tlscfg.Options RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"` TenancyOpts tenancy.Options +} + +type ConfigV2 struct { + Tenancy tenancy.Options `mapstructure:"multi_tenancy"` + configgrpc.ClientConfig `mapstructure:",squash"` + exporterhelper.TimeoutSettings `mapstructure:",squash"` +} - remoteConn *grpc.ClientConn +func (c *Configuration) TranslateToConfigV2() *ConfigV2 { + return &ConfigV2{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: c.RemoteServerAddr, + TLSSetting: c.RemoteTLS.ToOtelClientConfig(), + }, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: c.RemoteConnectTimeout, + }, + } } // ClientPluginServices defines services plugin can expose and its capabilities type ClientPluginServices struct { shared.PluginServices Capabilities shared.PluginCapabilities + remoteConn *grpc.ClientConn } -// PluginBuilder is used to create storage plugins. Implemented by Configuration. -// TODO this interface should be removed and the building capability moved to Factory. -type PluginBuilder interface { - Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) - Close() error -} - -// Build instantiates a PluginServices -func (c *Configuration) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) { - return c.buildRemote(logger, tracerProvider, grpc.NewClient) +// TODO move this to factory.go +func (c *ConfigV2) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) { + telset := component.TelemetrySettings{ + Logger: logger, + TracerProvider: tracerProvider, + } + newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + return c.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...) + } + return newRemoteStorage(c, telset, newClientFn) } -type newClientFn func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) +type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error) -func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.TracerProvider, newClient newClientFn) (*ClientPluginServices, error) { +func newRemoteStorage(c *ConfigV2, telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) { opts := []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))), + grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))), } - if c.RemoteTLS.Enabled { - tlsCfg, err := c.RemoteTLS.Config(logger) - if err != nil { - return nil, err - } - creds := credentials.NewTLS(tlsCfg) - opts = append(opts, grpc.WithTransportCredentials(creds)) - } else { - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + if c.Auth != nil { + return nil, fmt.Errorf("authenticator is not supported") } - tenancyMgr := tenancy.NewManager(&c.TenancyOpts) + tenancyMgr := tenancy.NewManager(&c.Tenancy) if tenancyMgr.Enabled { opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) } - var err error - c.remoteConn, err = newClient(c.RemoteServerAddr, opts...) + + remoteConn, err := newClient(opts...) if err != nil { return nil, fmt.Errorf("error creating remote storage client: %w", err) } - - grpcClient := shared.NewGRPCClient(c.remoteConn) + grpcClient := shared.NewGRPCClient(remoteConn) return &ClientPluginServices{ PluginServices: shared.PluginServices{ Store: grpcClient, @@ -95,14 +106,13 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra StreamingSpanWriter: grpcClient, }, Capabilities: grpcClient, + remoteConn: remoteConn, }, nil } -func (c *Configuration) Close() error { - var errs []error +func (c *ClientPluginServices) Close() error { if c.remoteConn != nil { - errs = append(errs, c.remoteConn.Close()) + return c.remoteConn.Close() } - errs = append(errs, c.RemoteTLS.Close()) - return errors.Join(errs...) + return nil } diff --git a/plugin/storage/grpc/config/config_test.go b/plugin/storage/grpc/config/config_test.go index 64f54e58418..dc6bf298ac9 100644 --- a/plugin/storage/grpc/config/config_test.go +++ b/plugin/storage/grpc/config/config_test.go @@ -8,16 +8,17 @@ import ( "testing" "github.com/stretchr/testify/require" - "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "google.golang.org/grpc" ) func TestBuildRemoteNewClientError(t *testing.T) { // this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params. - c := &Configuration{} - _, err := c.buildRemote(zap.NewNop(), nil, func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + c := &ConfigV2{} + newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { return nil, errors.New("test error") - }) + } + _, err := newRemoteStorage(c, component.TelemetrySettings{}, newClientFn) require.Error(t, err) require.Contains(t, err.Error(), "error creating remote storage client") } diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index 00440f7f8e2..fd39208f750 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -15,6 +15,7 @@ package grpc import ( + "errors" "flag" "fmt" "io" @@ -27,7 +28,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" - "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -42,17 +42,14 @@ var ( // interface comformance checks // Factory implements storage.Factory and creates storage components backed by a storage plugin. type Factory struct { - options Options metricsFactory metrics.Factory logger *zap.Logger tracerProvider trace.TracerProvider - builder config.PluginBuilder + configV1 config.Configuration + configV2 *config.ConfigV2 - store shared.StoragePlugin - archiveStore shared.ArchiveStoragePlugin - streamingSpanWriter shared.StreamingSpanWriterPlugin - capabilities shared.PluginCapabilities + services *config.ClientPluginServices } // NewFactory creates a new Factory. @@ -62,14 +59,13 @@ func NewFactory() *Factory { // NewFactoryWithConfig is used from jaeger(v2). func NewFactoryWithConfig( - cfg config.Configuration, + cfg config.ConfigV2, metricsFactory metrics.Factory, logger *zap.Logger, ) (*Factory, error) { f := NewFactory() - f.configureFromOptions(Options{Configuration: cfg}) - err := f.Initialize(metricsFactory, logger) - if err != nil { + f.configV2 = &cfg + if err := f.Initialize(metricsFactory, logger); err != nil { return nil, err } return f, nil @@ -77,21 +73,14 @@ func NewFactoryWithConfig( // AddFlags implements plugin.Configurable func (f *Factory) AddFlags(flagSet *flag.FlagSet) { - f.options.AddFlags(flagSet) + v1AddFlags(flagSet) } // InitFromViper implements plugin.Configurable func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { - if err := f.options.InitFromViper(v); err != nil { + if err := v1InitFromViper(&f.configV1, v); err != nil { logger.Fatal("unable to initialize gRPC storage factory", zap.Error(err)) } - f.builder = &f.options.Configuration -} - -// configureFromOptions initializes factory from options -func (f *Factory) configureFromOptions(opts Options) { - f.options = opts - f.builder = &f.options.Configuration } // Initialize implements storage.Factory @@ -99,71 +88,75 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.metricsFactory, f.logger = metricsFactory, logger f.tracerProvider = otel.GetTracerProvider() - services, err := f.builder.Build(logger, f.tracerProvider) + if f.configV2 == nil { + f.configV2 = f.configV1.TranslateToConfigV2() + } + + var err error + f.services, err = f.configV2.Build(logger, f.tracerProvider) if err != nil { return fmt.Errorf("grpc storage builder failed to create a store: %w", err) } - - f.store = services.Store - f.archiveStore = services.ArchiveStore - f.capabilities = services.Capabilities - f.streamingSpanWriter = services.StreamingSpanWriter - logger.Info("Remote storage configuration", zap.Any("configuration", f.options.Configuration)) + logger.Info("Remote storage configuration", zap.Any("configuration", f.configV2)) return nil } // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return f.store.SpanReader(), nil + return f.services.Store.SpanReader(), nil } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - if f.capabilities != nil && f.streamingSpanWriter != nil { - if capabilities, err := f.capabilities.Capabilities(); err == nil && capabilities.StreamingSpanWriter { - return f.streamingSpanWriter.StreamingSpanWriter(), nil + if f.services.Capabilities != nil && f.services.StreamingSpanWriter != nil { + if capabilities, err := f.services.Capabilities.Capabilities(); err == nil && capabilities.StreamingSpanWriter { + return f.services.StreamingSpanWriter.StreamingSpanWriter(), nil } } - return f.store.SpanWriter(), nil + return f.services.Store.SpanWriter(), nil } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return f.store.DependencyReader(), nil + return f.services.Store.DependencyReader(), nil } // CreateArchiveSpanReader implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if f.capabilities == nil { + if f.services.Capabilities == nil { return nil, storage.ErrArchiveStorageNotSupported } - capabilities, err := f.capabilities.Capabilities() + capabilities, err := f.services.Capabilities.Capabilities() if err != nil { return nil, err } if capabilities == nil || !capabilities.ArchiveSpanReader { return nil, storage.ErrArchiveStorageNotSupported } - return f.archiveStore.ArchiveSpanReader(), nil + return f.services.ArchiveStore.ArchiveSpanReader(), nil } // CreateArchiveSpanWriter implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if f.capabilities == nil { + if f.services.Capabilities == nil { return nil, storage.ErrArchiveStorageNotSupported } - capabilities, err := f.capabilities.Capabilities() + capabilities, err := f.services.Capabilities.Capabilities() if err != nil { return nil, err } if capabilities == nil || !capabilities.ArchiveSpanWriter { return nil, storage.ErrArchiveStorageNotSupported } - return f.archiveStore.ArchiveSpanWriter(), nil + return f.services.ArchiveStore.ArchiveSpanWriter(), nil } // Close closes the resources held by the factory func (f *Factory) Close() error { - // TODO Close should move into Services type, instead of being in the Config. - return f.builder.Close() + var errs []error + if f.services != nil { + errs = append(errs, f.services.Close()) + } + errs = append(errs, f.configV1.RemoteTLS.Close()) + return errors.Join(errs...) } diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index 71270cdc4ff..dfc32f40612 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -15,6 +15,7 @@ package grpc import ( + "context" "errors" "log" "net" @@ -23,8 +24,11 @@ import ( "github.com/spf13/viper" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/collector/config/configauth" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" "google.golang.org/grpc" @@ -40,111 +44,109 @@ import ( spanStoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) -type mockPluginBuilder struct { - plugin *mockPlugin - writerType string - err error +type store struct { + reader spanstore.Reader + writer spanstore.Writer + deps dependencystore.Reader } -func (b *mockPluginBuilder) Build(logger *zap.Logger, tracer trace.TracerProvider) (*grpcConfig.ClientPluginServices, error) { - if b.err != nil { - return nil, b.err - } - - services := &grpcConfig.ClientPluginServices{ - PluginServices: shared.PluginServices{ - Store: b.plugin, - ArchiveStore: b.plugin, - }, - } - if b.writerType == "streaming" { - services.PluginServices.StreamingSpanWriter = b.plugin - } - if b.plugin.capabilities != nil { - services.Capabilities = b.plugin - } - - return services, nil +func (s *store) SpanReader() spanstore.Reader { + return s.reader } -func (b *mockPluginBuilder) Close() error { - return nil +func (s *store) SpanWriter() spanstore.Writer { + return s.writer } -type mockPlugin struct { - spanReader spanstore.Reader - spanWriter spanstore.Writer - archiveReader spanstore.Reader - archiveWriter spanstore.Writer - streamingSpanWriter spanstore.Writer - capabilities shared.PluginCapabilities - dependencyReader dependencystore.Reader +func (s *store) ArchiveSpanReader() spanstore.Reader { + return s.reader } -func (mp *mockPlugin) Capabilities() (*shared.Capabilities, error) { - return mp.capabilities.Capabilities() +func (s *store) ArchiveSpanWriter() spanstore.Writer { + return s.writer } -func (mp *mockPlugin) ArchiveSpanReader() spanstore.Reader { - return mp.archiveReader +func (s *store) DependencyReader() dependencystore.Reader { + return s.deps } -func (mp *mockPlugin) ArchiveSpanWriter() spanstore.Writer { - return mp.archiveWriter +func (s *store) StreamingSpanWriter() spanstore.Writer { + return s.writer } -func (mp *mockPlugin) SpanReader() spanstore.Reader { - return mp.spanReader +func makeMockServices() *grpcConfig.ClientPluginServices { + return &grpcConfig.ClientPluginServices{ + PluginServices: shared.PluginServices{ + Store: &store{ + writer: new(spanStoreMocks.Writer), + reader: new(spanStoreMocks.Reader), + deps: new(dependencyStoreMocks.Reader), + }, + ArchiveStore: &store{ + writer: new(spanStoreMocks.Writer), + reader: new(spanStoreMocks.Reader), + }, + StreamingSpanWriter: &store{ + writer: new(spanStoreMocks.Writer), + }, + }, + Capabilities: new(mocks.PluginCapabilities), + } } -func (mp *mockPlugin) SpanWriter() spanstore.Writer { - return mp.spanWriter -} +func makeFactory(t *testing.T) *Factory { + f := NewFactory() + f.InitFromViper(viper.New(), zap.NewNop()) + require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) -func (mp *mockPlugin) StreamingSpanWriter() spanstore.Writer { - return mp.streamingSpanWriter -} + keepServices := f.services + t.Cleanup(func() { + keepServices.Close() + f.Close() + }) -func (mp *mockPlugin) DependencyReader() dependencystore.Reader { - return mp.dependencyReader + f.services = makeMockServices() + return f } -func TestGRPCStorageFactory(t *testing.T) { - f := NewFactory() - v := viper.New() - f.InitFromViper(v, zap.NewNop()) - - // after InitFromViper, f.builder points to a real plugin builder that will fail in unit tests, - // so we override it with a mock. - f.builder = &mockPluginBuilder{ - err: errors.New("made-up error"), - } - err := f.Initialize(metrics.NullFactory, zap.NewNop()) - require.Error(t, err) - assert.Contains(t, err.Error(), "made-up error") - - f.builder = &mockPluginBuilder{ - plugin: &mockPlugin{ - spanWriter: new(spanStoreMocks.Writer), - spanReader: new(spanStoreMocks.Reader), - archiveWriter: new(spanStoreMocks.Writer), - archiveReader: new(spanStoreMocks.Reader), - capabilities: new(mocks.PluginCapabilities), - dependencyReader: new(dependencyStoreMocks.Reader), +func TestNewFactoryError(t *testing.T) { + cfg := &grpcConfig.ConfigV2{ + ClientConfig: configgrpc.ClientConfig{ + // non-empty Auth is currently not supported + Auth: &configauth.Authentication{}, }, } - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + t.Run("with_config", func(t *testing.T) { + _, err := NewFactoryWithConfig(*cfg, metrics.NullFactory, zap.NewNop()) + require.Error(t, err) + assert.Contains(t, err.Error(), "authenticator") + }) + + t.Run("viper", func(t *testing.T) { + f := NewFactory() + f.InitFromViper(viper.New(), zap.NewNop()) + f.configV2 = cfg + err := f.Initialize(metrics.NullFactory, zap.NewNop()) + require.Error(t, err) + assert.Contains(t, err.Error(), "authenticator") + }) +} + +func TestInitFactory(t *testing.T) { + f := makeFactory(t) + f.services.Capabilities = nil - assert.NotNil(t, f.store) reader, err := f.CreateSpanReader() require.NoError(t, err) - assert.Equal(t, f.store.SpanReader(), reader) + assert.Equal(t, f.services.Store.SpanReader(), reader) + writer, err := f.CreateSpanWriter() require.NoError(t, err) - assert.Equal(t, f.store.SpanWriter(), writer) + assert.Equal(t, f.services.Store.SpanWriter(), writer) + depReader, err := f.CreateDependencyReader() require.NoError(t, err) - assert.Equal(t, f.store.DependencyReader(), depReader) + assert.Equal(t, f.services.Store.DependencyReader(), depReader) } func TestGRPCStorageFactoryWithConfig(t *testing.T) { @@ -159,9 +161,13 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) { }() defer s.Stop() - cfg := grpcConfig.Configuration{ - RemoteServerAddr: lis.Addr().String(), - RemoteConnectTimeout: 1 * time.Second, + cfg := grpcConfig.ConfigV2{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: lis.Addr().String(), + }, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 1 * time.Second, + }, } f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) @@ -169,11 +175,9 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) { } func TestGRPCStorageFactory_Capabilities(t *testing.T) { - f := NewFactory() - v := viper.New() - f.InitFromViper(v, zap.NewNop()) + f := makeFactory(t) - capabilities := new(mocks.PluginCapabilities) + capabilities := f.services.Capabilities.(*mocks.PluginCapabilities) capabilities.On("Capabilities"). Return(&shared.Capabilities{ ArchiveSpanReader: true, @@ -181,53 +185,30 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) { StreamingSpanWriter: true, }, nil).Times(3) - f.builder = &mockPluginBuilder{ - plugin: &mockPlugin{ - capabilities: capabilities, - archiveWriter: new(spanStoreMocks.Writer), - archiveReader: new(spanStoreMocks.Reader), - streamingSpanWriter: new(spanStoreMocks.Writer), - }, - writerType: "streaming", - } - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - - assert.NotNil(t, f.store) reader, err := f.CreateArchiveSpanReader() require.NoError(t, err) assert.NotNil(t, reader) + writer, err := f.CreateArchiveSpanWriter() require.NoError(t, err) assert.NotNil(t, writer) + writer, err = f.CreateSpanWriter() require.NoError(t, err) - assert.Equal(t, f.streamingSpanWriter.StreamingSpanWriter(), writer) + assert.NotNil(t, writer) } func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) { - f := NewFactory() - v := viper.New() - f.InitFromViper(v, zap.NewNop()) + f := makeFactory(t) - capabilities := new(mocks.PluginCapabilities) + capabilities := f.services.Capabilities.(*mocks.PluginCapabilities) capabilities.On("Capabilities"). Return(&shared.Capabilities{ ArchiveSpanReader: false, ArchiveSpanWriter: false, StreamingSpanWriter: false, - }, nil) - - f.builder = &mockPluginBuilder{ - plugin: &mockPlugin{ - capabilities: capabilities, - archiveWriter: new(spanStoreMocks.Writer), - archiveReader: new(spanStoreMocks.Reader), - spanWriter: new(spanStoreMocks.Writer), - }, - } - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + }, nil).Times(3) - assert.NotNil(t, f.store) reader, err := f.CreateArchiveSpanReader() require.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error()) assert.Nil(t, reader) @@ -236,30 +217,16 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) { assert.Nil(t, writer) writer, err = f.CreateSpanWriter() require.NoError(t, err) - assert.Equal(t, f.store.SpanWriter(), writer) + assert.NotNil(t, writer, "regular span writer is available") } func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) { - f := NewFactory() - v := viper.New() - f.InitFromViper(v, zap.NewNop()) + f := makeFactory(t) - capabilities := new(mocks.PluginCapabilities) + capabilities := f.services.Capabilities.(*mocks.PluginCapabilities) customError := errors.New("made-up error") - capabilities.On("Capabilities"). - Return(nil, customError) - - f.builder = &mockPluginBuilder{ - plugin: &mockPlugin{ - capabilities: capabilities, - archiveWriter: new(spanStoreMocks.Writer), - archiveReader: new(spanStoreMocks.Reader), - spanWriter: new(spanStoreMocks.Writer), - }, - } - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + capabilities.On("Capabilities").Return(nil, customError) - assert.NotNil(t, f.store) reader, err := f.CreateArchiveSpanReader() require.EqualError(t, err, customError.Error()) assert.Nil(t, reader) @@ -268,24 +235,13 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) { assert.Nil(t, writer) writer, err = f.CreateSpanWriter() require.NoError(t, err) - assert.Equal(t, f.store.SpanWriter(), writer) + assert.NotNil(t, writer, "regular span writer is available") } func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) { - f := NewFactory() - v := viper.New() - f.InitFromViper(v, zap.NewNop()) - - f.builder = &mockPluginBuilder{ - plugin: &mockPlugin{ - archiveWriter: new(spanStoreMocks.Writer), - archiveReader: new(spanStoreMocks.Reader), - spanWriter: new(spanStoreMocks.Writer), - }, - } - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + f := makeFactory(t) + f.services.Capabilities = nil - assert.NotNil(t, f.store) reader, err := f.CreateArchiveSpanReader() assert.Equal(t, err, storage.ErrArchiveStorageNotSupported) assert.Nil(t, reader) @@ -294,10 +250,10 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) { assert.Nil(t, writer) writer, err = f.CreateSpanWriter() require.NoError(t, err) - assert.Equal(t, f.store.SpanWriter(), writer) + assert.NotNil(t, writer, "regular span writer is available") } -func TestWithConfiguration(t *testing.T) { +func TestWithCLIFlags(t *testing.T) { f := NewFactory() v, command := config.Viperize(f.AddFlags) err := command.ParseFlags([]string{ @@ -305,67 +261,59 @@ func TestWithConfiguration(t *testing.T) { }) require.NoError(t, err) f.InitFromViper(v, zap.NewNop()) - assert.Equal(t, "foo:1234", f.options.Configuration.RemoteServerAddr) + assert.Equal(t, "foo:1234", f.configV1.RemoteServerAddr) require.NoError(t, f.Close()) } -func TestConfigureFromOptions(t *testing.T) { - f := Factory{} - o := Options{ - Configuration: grpcConfig.Configuration{ - RemoteServerAddr: "foo:1234", - }, - } - f.configureFromOptions(o) - assert.Equal(t, o, f.options) - assert.Equal(t, &o.Configuration, f.builder) -} - func TestStreamingSpanWriterFactory_CapabilitiesNil(t *testing.T) { - f := NewFactory() - v := viper.New() - f.InitFromViper(v, zap.NewNop()) + f := makeFactory(t) + + f.services.Capabilities = nil + mockWriter := f.services.Store.SpanWriter().(*spanStoreMocks.Writer) + mockWriter.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("not streaming writer")) + mockWriter2 := f.services.StreamingSpanWriter.StreamingSpanWriter().(*spanStoreMocks.Writer) + mockWriter2.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("I am streaming writer")) - f.builder = &mockPluginBuilder{ - plugin: &mockPlugin{ - archiveWriter: new(spanStoreMocks.Writer), - archiveReader: new(spanStoreMocks.Reader), - spanWriter: new(spanStoreMocks.Writer), - }, - writerType: "streaming", - } - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) writer, err := f.CreateSpanWriter() require.NoError(t, err) - assert.Equal(t, f.store.SpanWriter(), writer) + err = writer.WriteSpan(context.Background(), nil) + require.Error(t, err) + require.Contains(t, err.Error(), "not streaming writer") } func TestStreamingSpanWriterFactory_Capabilities(t *testing.T) { - f := NewFactory() - v := viper.New() - f.InitFromViper(v, zap.NewNop()) + f := makeFactory(t) - capabilities := new(mocks.PluginCapabilities) + capabilities := f.services.Capabilities.(*mocks.PluginCapabilities) customError := errors.New("made-up error") - capabilities.On("Capabilities"). - Return(nil, customError).Once(). - On("Capabilities").Return(&shared.Capabilities{}, nil).Once() - - f.builder = &mockPluginBuilder{ - plugin: &mockPlugin{ - archiveWriter: new(spanStoreMocks.Writer), - archiveReader: new(spanStoreMocks.Reader), - spanWriter: new(spanStoreMocks.Writer), - capabilities: capabilities, - }, - writerType: "streaming", - } - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + capabilities. + // return error on the first call + On("Capabilities").Return(nil, customError).Once(). + // then return false on the second call + On("Capabilities").Return(&shared.Capabilities{}, nil).Once(). + // then return true on the second call + On("Capabilities").Return(&shared.Capabilities{StreamingSpanWriter: true}, nil).Once() + + mockWriter := f.services.Store.SpanWriter().(*spanStoreMocks.Writer) + mockWriter.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("not streaming writer")) + mockWriter2 := f.services.StreamingSpanWriter.StreamingSpanWriter().(*spanStoreMocks.Writer) + mockWriter2.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("I am streaming writer")) + writer, err := f.CreateSpanWriter() require.NoError(t, err) - assert.Equal(t, f.store.SpanWriter(), writer) // get unary writer when Capabilities return error + err = writer.WriteSpan(context.Background(), nil) + require.Error(t, err) + require.Contains(t, err.Error(), "not streaming writer", "unary writer when Capabilities return error") + + writer, err = f.CreateSpanWriter() + require.NoError(t, err) + err = writer.WriteSpan(context.Background(), nil) + require.Error(t, err) + require.Contains(t, err.Error(), "not streaming writer", "unary writer when Capabilities return false") writer, err = f.CreateSpanWriter() require.NoError(t, err) - assert.Equal(t, f.store.SpanWriter(), writer) // get unary writer when Capabilities return false + err = writer.WriteSpan(context.Background(), nil) + require.Error(t, err) + require.Contains(t, err.Error(), "I am streaming writer", "streaming writer when Capabilities return true") } diff --git a/plugin/storage/grpc/options.go b/plugin/storage/grpc/options.go index bfefabd43a6..b70290baf23 100644 --- a/plugin/storage/grpc/options.go +++ b/plugin/storage/grpc/options.go @@ -33,12 +33,6 @@ const ( defaultConnectionTimeout = time.Duration(5 * time.Second) ) -// Options contains GRPC plugins configs and provides the ability -// to bind them to command line flags -type Options struct { - Configuration config.Configuration `mapstructure:",squash"` -} - func tlsFlagsConfig() tlscfg.ClientFlagsConfig { return tlscfg.ClientFlagsConfig{ Prefix: remotePrefix, @@ -46,22 +40,21 @@ func tlsFlagsConfig() tlscfg.ClientFlagsConfig { } // AddFlags adds flags for Options -func (opt *Options) AddFlags(flagSet *flag.FlagSet) { +func v1AddFlags(flagSet *flag.FlagSet) { tlsFlagsConfig().AddFlags(flagSet) flagSet.String(remoteServer, "", "The remote storage gRPC server address as host:port") flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote storage gRPC server connection timeout") } -// InitFromViper initializes Options with properties from viper -func (opt *Options) InitFromViper(v *viper.Viper) error { - opt.Configuration.RemoteServerAddr = v.GetString(remoteServer) +func v1InitFromViper(cfg *config.Configuration, v *viper.Viper) error { + cfg.RemoteServerAddr = v.GetString(remoteServer) var err error - opt.Configuration.RemoteTLS, err = tlsFlagsConfig().InitFromViper(v) + cfg.RemoteTLS, err = tlsFlagsConfig().InitFromViper(v) if err != nil { return fmt.Errorf("failed to parse gRPC storage TLS options: %w", err) } - opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout) - opt.Configuration.TenancyOpts = tenancy.InitFromViper(v) + cfg.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout) + cfg.TenancyOpts = tenancy.InitFromViper(v) return nil } diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go index dce4e35d3ad..4d1670e97c2 100644 --- a/plugin/storage/grpc/options_test.go +++ b/plugin/storage/grpc/options_test.go @@ -23,64 +23,65 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/tenancy" + grpccfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" ) func TestOptionsWithFlags(t *testing.T) { - opts := &Options{} - v, command := config.Viperize(opts.AddFlags, tenancy.AddFlags) + v, command := config.Viperize(v1AddFlags, tenancy.AddFlags) err := command.ParseFlags([]string{ "--grpc-storage.server=foo:12345", "--multi-tenancy.header=x-scope-orgid", }) require.NoError(t, err) - opts.InitFromViper(v) + var cfg grpccfg.Configuration + require.NoError(t, v1InitFromViper(&cfg, v)) - assert.Equal(t, "foo:12345", opts.Configuration.RemoteServerAddr) - assert.False(t, opts.Configuration.TenancyOpts.Enabled) - assert.Equal(t, "x-scope-orgid", opts.Configuration.TenancyOpts.Header) + assert.Equal(t, "foo:12345", cfg.RemoteServerAddr) + assert.False(t, cfg.TenancyOpts.Enabled) + assert.Equal(t, "x-scope-orgid", cfg.TenancyOpts.Header) } func TestRemoteOptionsWithFlags(t *testing.T) { - opts := &Options{} - v, command := config.Viperize(opts.AddFlags) + v, command := config.Viperize(v1AddFlags) err := command.ParseFlags([]string{ "--grpc-storage.server=localhost:2001", "--grpc-storage.tls.enabled=true", "--grpc-storage.connection-timeout=60s", }) require.NoError(t, err) - opts.InitFromViper(v) + var cfg grpccfg.Configuration + require.NoError(t, v1InitFromViper(&cfg, v)) - assert.Equal(t, "localhost:2001", opts.Configuration.RemoteServerAddr) - assert.True(t, opts.Configuration.RemoteTLS.Enabled) - assert.Equal(t, 60*time.Second, opts.Configuration.RemoteConnectTimeout) + assert.Equal(t, "localhost:2001", cfg.RemoteServerAddr) + assert.True(t, cfg.RemoteTLS.Enabled) + assert.Equal(t, 60*time.Second, cfg.RemoteConnectTimeout) } func TestRemoteOptionsNoTLSWithFlags(t *testing.T) { - opts := &Options{} - v, command := config.Viperize(opts.AddFlags) + v, command := config.Viperize(v1AddFlags) err := command.ParseFlags([]string{ "--grpc-storage.server=localhost:2001", "--grpc-storage.tls.enabled=false", "--grpc-storage.connection-timeout=60s", }) require.NoError(t, err) - opts.InitFromViper(v) + var cfg grpccfg.Configuration + require.NoError(t, v1InitFromViper(&cfg, v)) - assert.Equal(t, "localhost:2001", opts.Configuration.RemoteServerAddr) - assert.False(t, opts.Configuration.RemoteTLS.Enabled) - assert.Equal(t, 60*time.Second, opts.Configuration.RemoteConnectTimeout) + assert.Equal(t, "localhost:2001", cfg.RemoteServerAddr) + assert.False(t, cfg.RemoteTLS.Enabled) + assert.Equal(t, 60*time.Second, cfg.RemoteConnectTimeout) } func TestFailedTLSFlags(t *testing.T) { - opts := &Options{} - v, command := config.Viperize(opts.AddFlags) + v, command := config.Viperize(v1AddFlags) err := command.ParseFlags([]string{ "--grpc-storage.tls.enabled=false", "--grpc-storage.tls.cert=blah", // invalid unless tls.enabled=true }) require.NoError(t, err) - err = opts.InitFromViper(v) + var cfg grpccfg.Configuration + err = v1InitFromViper(&cfg, v) require.Error(t, err) assert.Contains(t, err.Error(), "failed to parse gRPC storage TLS options") }