diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 3ead38258c2..a7dffefe2df 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -52,6 +52,7 @@ extensions: password: "cassandra" tls: insecure: true + is_archive: true receivers: otlp: protocols: diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 4654a2a2c82..f7f95a83465 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -40,7 +40,6 @@ const ( var ( // interface comformance checks _ storage.Factory = (*Factory)(nil) _ storage.Purger = (*Factory)(nil) - _ storage.ArchiveFactory = (*Factory)(nil) _ storage.SamplingStoreFactory = (*Factory)(nil) _ io.Closer = (*Factory)(nil) _ plugin.Configurable = (*Factory)(nil) @@ -50,22 +49,26 @@ var ( // interface comformance checks type Factory struct { Options *Options - primaryMetricsFactory metrics.Factory - archiveMetricsFactory metrics.Factory - logger *zap.Logger - tracer trace.TracerProvider + metricsFactory metrics.Factory + logger *zap.Logger + tracer trace.TracerProvider - primaryConfig config.SessionBuilder - primarySession cassandra.Session - archiveConfig config.SessionBuilder - archiveSession cassandra.Session + config config.SessionBuilder + session cassandra.Session } // NewFactory creates a new Factory. -func NewFactory() *Factory { +func NewFactory(isArchive bool) *Factory { + var options *Options + if isArchive { + options = NewOptions(archiveStorageConfig) + options.IsArchive = true + } else { + options = NewOptions(primaryStorageConfig) + } return &Factory{ tracer: otel.GetTracerProvider(), - Options: NewOptions(primaryStorageConfig, archiveStorageConfig), + Options: options, } } @@ -75,7 +78,7 @@ func NewFactoryWithConfig( metricsFactory metrics.Factory, logger *zap.Logger, ) (*Factory, error) { - f := NewFactory() + f := NewFactory(opts.IsArchive) // use this to help with testing b := &withConfigBuilder{ f: f, @@ -121,43 +124,30 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { // InitFromOptions initializes factory from options. func (f *Factory) configureFromOptions(o *Options) { f.Options = o - // TODO this is a hack because we do not define defaults in Options - if o.others == nil { - o.others = make(map[string]*NamespaceConfig) - } - f.primaryConfig = o.GetPrimary() - if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { - f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error - } + f.config = o.GetPrimary() } // Initialize implements storage.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil}) - f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) + metricsNamespace := "cassandra" + if f.Options.IsArchive { + metricsNamespace = "cassandra-archive" + } + f.metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: metricsNamespace, Tags: nil}) f.logger = logger - primarySession, err := f.primaryConfig.NewSession() + primarySession, err := f.config.NewSession() if err != nil { return err } - f.primarySession = primarySession - - if f.archiveConfig != nil { - archiveSession, err := f.archiveConfig.NewSession() - if err != nil { - return err - } - f.archiveSession = archiveSession - } else { - logger.Info("Cassandra archive storage configuration is empty, skipping") - } + f.session = primarySession + return nil } // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) + return cSpanStore.NewSpanReader(f.session, f.metricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) } // CreateSpanWriter implements storage.Factory @@ -166,33 +156,13 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { if err != nil { return nil, err } - return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...) + return cSpanStore.NewSpanWriter(f.session, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - version := cDepStore.GetDependencyVersion(f.primarySession) - return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version) -} - -// CreateArchiveSpanReader implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if f.archiveSession == nil { - return nil, storage.ErrArchiveStorageNotConfigured - } - return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) -} - -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if f.archiveSession == nil { - return nil, storage.ErrArchiveStorageNotConfigured - } - options, err := writerOptions(f.Options) - if err != nil { - return nil, err - } - return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...) + version := cDepStore.GetDependencyVersion(f.session) + return cDepStore.NewDependencyStore(f.session, f.metricsFactory, f.logger, version) } // CreateLock implements storage.SamplingStoreFactory @@ -203,12 +173,12 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) { } f.logger.Info("Using unique participantName in the distributed lock", zap.String("participantName", hostId)) - return cLock.NewLock(f.primarySession, hostId), nil + return cLock.NewLock(f.session, hostId), nil } // CreateSamplingStore implements storage.SamplingStoreFactory func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { - return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil + return cSamplingStore.New(f.session, f.metricsFactory, f.logger), nil } func writerOptions(opts *Options) ([]cSpanStore.Option, error) { @@ -244,16 +214,13 @@ var _ io.Closer = (*Factory)(nil) // Close closes the resources held by the factory func (f *Factory) Close() error { - if f.primarySession != nil { - f.primarySession.Close() - } - if f.archiveSession != nil { - f.archiveSession.Close() + if f.session != nil { + f.session.Close() } return nil } func (f *Factory) Purge(_ context.Context) error { - return f.primarySession.Query("TRUNCATE traces").Exec() + return f.session.Query("TRUNCATE traces").Exec() } diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 9a2fcc486cf..a236ec5b3b2 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -39,15 +39,14 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { } func TestCassandraFactory(t *testing.T) { - logger, logBuf := testutils.NewLogger() - f := NewFactory() - v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{"--cassandra-archive.enabled=true"}) + logger, _ := testutils.NewLogger() + f := NewFactory(false) + v, _ := config.Viperize(f.AddFlags) f.InitFromViper(v, zap.NewNop()) // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") var ( @@ -57,13 +56,11 @@ func TestCassandraFactory(t *testing.T) { session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) session.On("Close").Return() query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - f.archiveConfig = nil + f.config = newMockSessionBuilder(session, nil) require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") _, err := f.CreateSpanReader() require.NoError(t, err) @@ -74,21 +71,6 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateDependencyReader() require.NoError(t, err) - _, err = f.CreateArchiveSpanReader() - require.EqualError(t, err, "archive storage not configured") - - _, err = f.CreateArchiveSpanWriter() - require.EqualError(t, err, "archive storage not configured") - - f.archiveConfig = newMockSessionBuilder(session, nil) - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - - _, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - - _, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) - _, err = f.CreateLock() require.NoError(t, err) @@ -99,11 +81,9 @@ func TestCassandraFactory(t *testing.T) { } func TestExclusiveWhitelistBlacklist(t *testing.T) { - logger, logBuf := testutils.NewLogger() - f := NewFactory() + f := NewFactory(false) v, command := config.Viperize(f.AddFlags) command.ParseFlags([]string{ - "--cassandra-archive.enabled=true", "--cassandra.index.tag-whitelist=a,b,c", "--cassandra.index.tag-blacklist=a,b,c", }) @@ -111,7 +91,7 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) + f.config = newMockSessionBuilder(nil, errors.New("made-up error")) require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") var ( @@ -120,22 +100,10 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { ) session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - - f.archiveConfig = nil - require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") + f.config = newMockSessionBuilder(session, nil) _, err := f.CreateSpanWriter() require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") - - f.archiveConfig = &mockSessionBuilder{} - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - - _, err = f.CreateArchiveSpanWriter() - require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") } func TestWriterOptions(t *testing.T) { @@ -181,13 +149,11 @@ func TestWriterOptions(t *testing.T) { } func TestConfigureFromOptions(t *testing.T) { - f := NewFactory() - o := NewOptions("foo", archiveStorageConfig) - o.others[archiveStorageConfig].Enabled = true + f := NewFactory(false) + o := NewOptions("foo") f.configureFromOptions(o) assert.Equal(t, o, f.Options) - assert.Equal(t, o.GetPrimary(), f.primaryConfig) - assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig) + assert.Equal(t, o.GetPrimary(), f.config) } func TestNewFactoryWithConfig(t *testing.T) { @@ -201,7 +167,7 @@ func TestNewFactoryWithConfig(t *testing.T) { }, }, } - f := NewFactory() + f := NewFactory(false) b := &withConfigBuilder{ f: f, opts: opts, @@ -223,7 +189,7 @@ func TestNewFactoryWithConfig(t *testing.T) { }, }, } - f := NewFactory() + f := NewFactory(false) b := &withConfigBuilder{ f: f, opts: opts, @@ -242,14 +208,14 @@ func TestNewFactoryWithConfig(t *testing.T) { } func TestFactory_Purge(t *testing.T) { - f := NewFactory() + f := NewFactory(false) var ( session = &mocks.Session{} query = &mocks.Query{} ) session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) query.On("Exec").Return(nil) - f.primarySession = session + f.session = session err := f.Purge(context.Background()) require.NoError(t, err) diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index e266436ba29..e5eeb0b6773 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -49,9 +49,10 @@ const ( // (e.g. archive) may be underspecified and infer the rest of its parameters from primary. type Options struct { Primary NamespaceConfig `mapstructure:",squash"` - others map[string]*NamespaceConfig - SpanStoreWriteCacheTTL time.Duration `mapstructure:"span_store_write_cache_ttl"` - Index IndexConfig `mapstructure:"index"` + SpanStoreWriteCacheTTL time.Duration `mapstructure:"span_store_write_cache_ttl"` + Index IndexConfig `mapstructure:"index"` + // TODO: this is just a placeholder + IsArchive bool `mapstructure:"is_archive"` } // IndexConfig configures indexing. @@ -74,31 +75,22 @@ type NamespaceConfig struct { } // NewOptions creates a new Options struct. -func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { +func NewOptions(namespace string) *Options { // TODO all default values should be defined via cobra flags options := &Options{ Primary: NamespaceConfig{ Configuration: config.DefaultConfiguration(), - namespace: primaryNamespace, + namespace: namespace, Enabled: true, }, - others: make(map[string]*NamespaceConfig, len(otherNamespaces)), SpanStoreWriteCacheTTL: time.Hour * 12, } - - for _, namespace := range otherNamespaces { - options.others[namespace] = &NamespaceConfig{namespace: namespace} - } - return options } // AddFlags adds flags for Options func (opt *Options) AddFlags(flagSet *flag.FlagSet) { addFlags(flagSet, opt.Primary) - for _, cfg := range opt.others { - addFlags(flagSet, *cfg) - } flagSet.Duration(opt.Primary.namespace+suffixSpanStoreWriteCacheTTL, opt.SpanStoreWriteCacheTTL, "The duration to wait before rewriting an existing service or operation name") @@ -206,9 +198,6 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { opt.Primary.initFromViper(v) - for _, cfg := range opt.others { - cfg.initFromViper(v) - } opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.Primary.namespace + suffixSpanStoreWriteCacheTTL) opt.Index.TagBlackList = stripWhiteSpace(v.GetString(opt.Primary.namespace + suffixIndexTagsBlacklist)) opt.Index.TagWhiteList = stripWhiteSpace(v.GetString(opt.Primary.namespace + suffixIndexTagsWhitelist)) @@ -260,22 +249,17 @@ func (opt *Options) GetPrimary() *config.Configuration { return &opt.Primary.Configuration } -// Get returns auxiliary named configuration. -func (opt *Options) Get(namespace string) *config.Configuration { - nsCfg, ok := opt.others[namespace] - if !ok { - nsCfg = &NamespaceConfig{} - opt.others[namespace] = nsCfg - } - if !nsCfg.Enabled { - return nil - } - nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) - if len(nsCfg.Connection.Servers) == 0 { - nsCfg.Connection.Servers = opt.Primary.Connection.Servers - } - return &nsCfg.Configuration -} +// // Get returns auxiliary named configuration. +// func (opt *Options) Get(namespace string) *config.Configuration { +// if !nsCfg.Enabled { +// return nil +// } +// nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) +// if len(nsCfg.Connection.Servers) == 0 { +// nsCfg.Connection.Servers = opt.Primary.Connection.Servers +// } +// return &nsCfg.Configuration +// } // TagIndexBlacklist returns the list of blacklisted tags func (opt *Options) TagIndexBlacklist() []string { diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 8ccd813c838..418554b2666 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -6,10 +6,8 @@ package cassandra import ( "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" ) @@ -20,22 +18,10 @@ func TestOptions(t *testing.T) { assert.NotEmpty(t, primary.Schema.Keyspace) assert.NotEmpty(t, primary.Connection.Servers) assert.Equal(t, 2, primary.Connection.ConnectionsPerHost) - - aux := opts.Get("archive") - assert.Nil(t, aux) - - assert.NotNil(t, opts.others["archive"]) - opts.others["archive"].Enabled = true - aux = opts.Get("archive") - require.NotNil(t, aux) - assert.Equal(t, primary.Schema.Keyspace, aux.Schema.Keyspace) - assert.Equal(t, primary.Connection.Servers, aux.Connection.Servers) - assert.Equal(t, primary.Connection.ConnectionsPerHost, aux.Connection.ConnectionsPerHost) - assert.Equal(t, primary.Connection.ReconnectInterval, aux.Connection.ReconnectInterval) } func TestOptionsWithFlags(t *testing.T) { - opts := NewOptions("cas", "cas-aux") + opts := NewOptions("cas") v, command := config.Viperize(opts.AddFlags) command.ParseFlags([]string{ "--cas.keyspace=jaeger", @@ -56,13 +42,6 @@ func TestOptionsWithFlags(t *testing.T) { "--cas.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator,com.datastax.bdp.cassandra.auth.DseAuthenticator", "--cas.username=username", "--cas.password=password", - // enable aux with a couple overrides - "--cas-aux.enabled=true", - "--cas-aux.keyspace=jaeger-archive", - "--cas-aux.servers=3.3.3.3, 4.4.4.4", - "--cas-aux.username=username", - "--cas-aux.password=password", - "--cas-aux.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator,com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator", }) opts.InitFromViper(v) @@ -77,20 +56,6 @@ func TestOptionsWithFlags(t *testing.T) { assert.True(t, opts.Index.Tags) assert.False(t, opts.Index.ProcessTags) assert.True(t, opts.Index.Logs) - - aux := opts.Get("cas-aux") - require.NotNil(t, aux) - assert.Equal(t, "jaeger-archive", aux.Schema.Keyspace) - assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Connection.Servers) - assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Connection.Authenticator.Basic.AllowedAuthenticators) - assert.Equal(t, 42, aux.Connection.ConnectionsPerHost) - assert.Equal(t, 42, aux.Query.MaxRetryAttempts) - assert.Equal(t, 42*time.Second, aux.Query.Timeout) - assert.Equal(t, 42*time.Second, aux.Connection.ReconnectInterval) - assert.Equal(t, 4242, aux.Connection.Port) - assert.Equal(t, "", aux.Query.Consistency, "aux storage does not inherit consistency from primary") - assert.Equal(t, 3, aux.Connection.ProtoVersion) - assert.Equal(t, 42*time.Second, aux.Connection.SocketKeepAlive) } func TestDefaultTlsHostVerify(t *testing.T) { diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index 498339d49e4..4bb11fe0a56 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -116,7 +116,7 @@ func NewFactory(config FactoryConfig) (*Factory, error) { func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) { switch factoryType { case cassandraStorageType: - return cassandra.NewFactory(), nil + return cassandra.NewFactory(false), nil case elasticsearchStorageType, opensearchStorageType: return es.NewFactory(false), nil case memoryStorageType: diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index c8f88d01452..9e4f477b0b3 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -42,7 +42,7 @@ func (s *CassandraStorageIntegration) cleanUp(t *testing.T) { func (*CassandraStorageIntegration) initializeCassandraFactory(t *testing.T, flags []string) *cassandra.Factory { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - f := cassandra.NewFactory() + f := cassandra.NewFactory(false) v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags(flags)) f.InitFromViper(v, logger) @@ -58,12 +58,6 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { "--cassandra.password=" + password, "--cassandra.username=" + username, "--cassandra.keyspace=jaeger_v1_dc1", - "--cassandra-archive.keyspace=jaeger_v1_dc1_archive", - "--cassandra-archive.enabled=true", - "--cassandra-archive.servers=127.0.0.1", - "--cassandra-archive.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator", - "--cassandra-archive.password=" + password, - "--cassandra-archive.username=" + username, }) s.factory = f var err error @@ -71,10 +65,6 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { require.NoError(t, err) s.SpanReader, err = f.CreateSpanReader() require.NoError(t, err) - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - require.NoError(t, err) - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - require.NoError(t, err) s.SamplingStore, err = f.CreateSamplingStore(0) require.NoError(t, err) s.initializeDependencyReaderAndWriter(t, f)