diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 90983141cff..0076fa48fd0 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -22,14 +22,26 @@ extensions: backends: some_storage: cassandra: - keyspace: "jaeger_v1_dc1" - username: "cassandra" - password: "cassandra" + schema: + keyspace: "jaeger_v1_dc1" + connection: + auth: + basic: + username: "cassandra" + password: "cassandra" + tls: + insecure: true another_storage: cassandra: - keyspace: "jaeger_v1_dc1" - username: "cassandra" - password: "cassandra" + schema: + keyspace: "jaeger_v1_dc1" + connection: + auth: + basic: + username: "cassandra" + password: "cassandra" + tls: + insecure: true receivers: otlp: protocols: diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config_test.go b/cmd/jaeger/internal/extension/jaegerstorage/config_test.go index ca0b8735cdd..58a000051bd 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config_test.go @@ -83,7 +83,7 @@ backends: `) cfg := createDefaultConfig().(*Config) require.NoError(t, conf.Unmarshal(cfg)) - assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Servers) + assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Connection.Servers) } func TestConfigDefaultElasticsearch(t *testing.T) { diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 80f51db77e8..6bab9c75da4 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -5,99 +5,138 @@ package config import ( + "context" "fmt" "time" "github.com/asaskevich/govalidator" "github.com/gocql/gocql" - "go.uber.org/zap" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/pkg/cassandra" gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) -// Configuration describes the configuration properties needed to connect to a Cassandra cluster +// Configuration describes the configuration properties needed to connect to a Cassandra cluster. type Configuration struct { - Servers []string `valid:"required,url" mapstructure:"servers"` - Keyspace string `mapstructure:"keyspace"` - LocalDC string `mapstructure:"local_dc"` - ConnectionsPerHost int `mapstructure:"connections_per_host"` - Timeout time.Duration `mapstructure:"-"` - ConnectTimeout time.Duration `mapstructure:"connection_timeout"` - ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` - SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` - MaxRetryAttempts int `mapstructure:"max_retry_attempts"` - ProtoVersion int `mapstructure:"proto_version"` - Consistency string `mapstructure:"consistency"` - DisableCompression bool `mapstructure:"disable_compression"` - Port int `mapstructure:"port"` - Authenticator Authenticator `mapstructure:",squash"` - DisableAutoDiscovery bool `mapstructure:"-"` - TLS tlscfg.Options `mapstructure:"tls"` + Schema Schema `mapstructure:"schema"` + Connection Connection `mapstructure:"connection"` + Query Query `mapstructure:"query"` } -func DefaultConfiguration() Configuration { - return Configuration{ - Servers: []string{"127.0.0.1"}, - Port: 9042, - MaxRetryAttempts: 3, - Keyspace: "jaeger_v1_test", - ProtoVersion: 4, - ConnectionsPerHost: 2, - ReconnectInterval: 60 * time.Second, - } +type Connection struct { + // Servers contains a list of hosts that are used to connect to the cluster. + Servers []string `mapstructure:"servers" valid:"required,url"` + // LocalDC contains the name of the local Data Center (DC) for DC-aware host selection + LocalDC string `mapstructure:"local_dc"` + // The port used when dialing to a cluster. + Port int `mapstructure:"port"` + // DisableAutoDiscovery, if set to true, will disable the cluster's auto-discovery features. + DisableAutoDiscovery bool `mapstructure:"disable_auto_discovery"` + // ConnectionsPerHost contains the maximum number of open connections for each host on the cluster. + ConnectionsPerHost int `mapstructure:"connections_per_host"` + // ReconnectInterval contains the regular interval after which the driver tries to connect to + // nodes that are down. + ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` + // SocketKeepAlive contains the keep alive period for the default dialer to the cluster. + SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` + // TLS contains the TLS configuration for the connection to the cluster. + TLS configtls.ClientConfig `mapstructure:"tls"` + // Timeout contains the maximum time spent to connect to a cluster. + Timeout time.Duration `mapstructure:"timeout"` + // Authenticator contains the details of the authentication mechanism that is used for + // connecting to a cluster. + Authenticator Authenticator `mapstructure:"auth"` + // ProtoVersion contains the version of the native protocol to use when connecting to a cluster. + ProtoVersion int `mapstructure:"proto_version"` +} + +type Schema struct { + // Keyspace contains the namespace where Jaeger data will be stored. + Keyspace string `mapstructure:"keyspace"` + // DisableCompression, if set to true, disables the use of the default Snappy Compression + // while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB, + // that do not support SnappyCompression. + DisableCompression bool `mapstructure:"disable_compression"` } -// Authenticator holds the authentication properties needed to connect to a Cassandra cluster +type Query struct { + // Timeout contains the maximum time spent executing a query. + Timeout time.Duration `mapstructure:"timeout"` + // MaxRetryAttempts indicates the maximum number of times a query will be retried for execution. + MaxRetryAttempts int `mapstructure:"max_retry_attempts"` + // Consistency specifies the consistency level which needs to be satisified before responding + // to a query. + Consistency string `mapstructure:"consistency"` +} + +// Authenticator holds the authentication properties needed to connect to a Cassandra cluster. type Authenticator struct { - Basic BasicAuthenticator `yaml:"basic" mapstructure:",squash"` + Basic BasicAuthenticator `mapstructure:"basic"` // TODO: add more auth types } -// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster +// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster. type BasicAuthenticator struct { - Username string `yaml:"username" mapstructure:"username"` - Password string `yaml:"password" mapstructure:"password" json:"-"` - AllowedAuthenticators []string `yaml:"allowed_authenticators" mapstructure:"allowed_authenticators"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + AllowedAuthenticators []string `mapstructure:"allowed_authenticators"` +} + +func DefaultConfiguration() Configuration { + return Configuration{ + Schema: Schema{ + Keyspace: "jaeger_v1_test", + }, + Connection: Connection{ + Servers: []string{"127.0.0.1"}, + Port: 9042, + ProtoVersion: 4, + ConnectionsPerHost: 2, + ReconnectInterval: 60 * time.Second, + }, + Query: Query{ + MaxRetryAttempts: 3, + }, + } } // ApplyDefaults copies settings from source unless its own value is non-zero. func (c *Configuration) ApplyDefaults(source *Configuration) { - if c.ConnectionsPerHost == 0 { - c.ConnectionsPerHost = source.ConnectionsPerHost + if c.Schema.Keyspace == "" { + c.Schema.Keyspace = source.Schema.Keyspace } - if c.MaxRetryAttempts == 0 { - c.MaxRetryAttempts = source.MaxRetryAttempts + if c.Connection.ConnectionsPerHost == 0 { + c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost } - if c.Timeout == 0 { - c.Timeout = source.Timeout + if c.Connection.ReconnectInterval == 0 { + c.Connection.ReconnectInterval = source.Connection.ReconnectInterval } - if c.ReconnectInterval == 0 { - c.ReconnectInterval = source.ReconnectInterval + if c.Connection.Port == 0 { + c.Connection.Port = source.Connection.Port } - if c.Port == 0 { - c.Port = source.Port + if c.Connection.ProtoVersion == 0 { + c.Connection.ProtoVersion = source.Connection.ProtoVersion } - if c.Keyspace == "" { - c.Keyspace = source.Keyspace + if c.Connection.SocketKeepAlive == 0 { + c.Connection.SocketKeepAlive = source.Connection.SocketKeepAlive } - if c.ProtoVersion == 0 { - c.ProtoVersion = source.ProtoVersion + if c.Query.MaxRetryAttempts == 0 { + c.Query.MaxRetryAttempts = source.Query.MaxRetryAttempts } - if c.SocketKeepAlive == 0 { - c.SocketKeepAlive = source.SocketKeepAlive + if c.Query.Timeout == 0 { + c.Query.Timeout = source.Query.Timeout } } // SessionBuilder creates new cassandra.Session type SessionBuilder interface { - NewSession(logger *zap.Logger) (cassandra.Session, error) + NewSession() (cassandra.Session, error) } // NewSession creates a new Cassandra session -func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error) { - cluster, err := c.NewCluster(logger) +func (c *Configuration) NewSession() (cassandra.Session, error) { + cluster, err := c.NewCluster() if err != nil { return nil, err } @@ -109,68 +148,64 @@ func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error } // NewCluster creates a new gocql cluster from the configuration -func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, error) { - cluster := gocql.NewCluster(c.Servers...) - cluster.Keyspace = c.Keyspace - cluster.NumConns = c.ConnectionsPerHost - cluster.Timeout = c.Timeout - cluster.ConnectTimeout = c.ConnectTimeout - cluster.ReconnectInterval = c.ReconnectInterval - cluster.SocketKeepalive = c.SocketKeepAlive - if c.ProtoVersion > 0 { - cluster.ProtoVersion = c.ProtoVersion +func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { + cluster := gocql.NewCluster(c.Connection.Servers...) + cluster.Keyspace = c.Schema.Keyspace + cluster.NumConns = c.Connection.ConnectionsPerHost + cluster.ConnectTimeout = c.Connection.Timeout + cluster.ReconnectInterval = c.Connection.ReconnectInterval + cluster.SocketKeepalive = c.Connection.SocketKeepAlive + cluster.Timeout = c.Query.Timeout + if c.Connection.ProtoVersion > 0 { + cluster.ProtoVersion = c.Connection.ProtoVersion } - if c.MaxRetryAttempts > 1 { - cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.MaxRetryAttempts - 1} + if c.Query.MaxRetryAttempts > 1 { + cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Query.MaxRetryAttempts - 1} } - if c.Port != 0 { - cluster.Port = c.Port + if c.Connection.Port != 0 { + cluster.Port = c.Connection.Port } - if !c.DisableCompression { + if !c.Schema.DisableCompression { cluster.Compressor = gocql.SnappyCompressor{} } - if c.Consistency == "" { + if c.Query.Consistency == "" { cluster.Consistency = gocql.LocalOne } else { - cluster.Consistency = gocql.ParseConsistency(c.Consistency) + cluster.Consistency = gocql.ParseConsistency(c.Query.Consistency) } fallbackHostSelectionPolicy := gocql.RoundRobinHostPolicy() - if c.LocalDC != "" { - fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.LocalDC) + if c.Connection.LocalDC != "" { + fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.Connection.LocalDC) } cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallbackHostSelectionPolicy, gocql.ShuffleReplicas()) - if c.Authenticator.Basic.Username != "" && c.Authenticator.Basic.Password != "" { + if c.Connection.Authenticator.Basic.Username != "" && c.Connection.Authenticator.Basic.Password != "" { cluster.Authenticator = gocql.PasswordAuthenticator{ - Username: c.Authenticator.Basic.Username, - Password: c.Authenticator.Basic.Password, - AllowedAuthenticators: c.Authenticator.Basic.AllowedAuthenticators, + Username: c.Connection.Authenticator.Basic.Username, + Password: c.Connection.Authenticator.Basic.Password, + AllowedAuthenticators: c.Connection.Authenticator.Basic.AllowedAuthenticators, } } - tlsCfg, err := c.TLS.Config(logger) - if err != nil { - return nil, err - } - if c.TLS.Enabled { + if !c.Connection.TLS.Insecure { + tlsCfg, err := c.Connection.TLS.LoadTLSConfig(context.Background()) + if err != nil { + return nil, err + } cluster.SslOpts = &gocql.SslOptions{ Config: tlsCfg, } } // If tunneling connection to C*, disable cluster autodiscovery features. - if c.DisableAutoDiscovery { + if c.Connection.DisableAutoDiscovery { cluster.DisableInitialHostLookup = true cluster.IgnorePeerAddr = true } return cluster, nil } -func (c *Configuration) Close() error { - return c.TLS.Close() -} - func (c *Configuration) String() string { return fmt.Sprintf("%+v", *c) } diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go new file mode 100644 index 00000000000..8d532f65c60 --- /dev/null +++ b/pkg/cassandra/config/config_test.go @@ -0,0 +1,96 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package config + +import ( + "testing" + + "github.com/gocql/gocql" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestValidate_ReturnsErrorWhenInvalid(t *testing.T) { + tests := []struct { + name string + cfg *Configuration + }{ + { + name: "missing required fields", + cfg: &Configuration{}, + }, + { + name: "require fields in invalid format", + cfg: &Configuration{ + Connection: Connection{ + Servers: []string{"not a url"}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := test.cfg.Validate() + require.Error(t, err) + }) + } +} + +func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) { + cfg := Configuration{ + Connection: Connection{ + Servers: []string{"localhost:9200"}, + }, + } + + err := cfg.Validate() + require.NoError(t, err) +} + +func TestNewClusterWithDefaults(t *testing.T) { + cfg := DefaultConfiguration() + cl, err := cfg.NewCluster() + require.NoError(t, err) + assert.NotEmpty(t, cl.Keyspace) +} + +func TestNewClusterWithOverrides(t *testing.T) { + cfg := DefaultConfiguration() + cfg.Query.Consistency = "LOCAL_QUORUM" + cfg.Connection.LocalDC = "local_dc" + cfg.Connection.Authenticator.Basic.Username = "username" + cfg.Connection.Authenticator.Basic.Password = "password" + cfg.Connection.TLS.Insecure = false + cfg.Connection.DisableAutoDiscovery = true + cl, err := cfg.NewCluster() + require.NoError(t, err) + assert.NotEmpty(t, cl.Keyspace) + assert.Equal(t, gocql.LocalQuorum, cl.Consistency) + assert.NotNil(t, cl.PoolConfig.HostSelectionPolicy, "local_dc") + require.IsType(t, gocql.PasswordAuthenticator{}, cl.Authenticator) + auth := cl.Authenticator.(gocql.PasswordAuthenticator) + assert.Equal(t, "username", auth.Username) + assert.Equal(t, "password", auth.Password) + assert.NotNil(t, cl.SslOpts) + assert.True(t, cl.DisableInitialHostLookup) +} + +func TestApplyDefaults(t *testing.T) { + cfg1 := DefaultConfiguration() + cfg2 := Configuration{} + cfg2.ApplyDefaults(&cfg1) + assert.Equal(t, cfg2.Schema, cfg1.Schema) + assert.Equal(t, cfg2.Query, cfg1.Query) + assert.NotEqual(t, cfg2.Connection.Servers, cfg1.Connection.Servers, "servers not copied") + cfg1.Connection.Servers = nil + assert.Equal(t, cfg2.Connection, cfg1.Connection) +} + +func TestToString(t *testing.T) { + cfg := DefaultConfiguration() + cfg.Schema.Keyspace = "test" + s := cfg.String() + assert.Contains(t, s, "Keyspace:test") +} diff --git a/pkg/cassandra/config/empty_test.go b/pkg/cassandra/config/package_test.go similarity index 100% rename from pkg/cassandra/config/empty_test.go rename to pkg/cassandra/config/package_test.go diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 8dbc5c90fe2..e26c215bac7 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -137,14 +137,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) f.logger = logger - primarySession, err := f.primaryConfig.NewSession(logger) + primarySession, err := f.primaryConfig.NewSession() if err != nil { return err } f.primarySession = primarySession if f.archiveConfig != nil { - archiveSession, err := f.archiveConfig.NewSession(logger) + archiveSession, err := f.archiveConfig.NewSession() if err != nil { return err } @@ -251,12 +251,7 @@ func (f *Factory) Close() error { f.archiveSession.Close() } - var errs []error - if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { - errs = append(errs, cfg.TLS.Close()) - } - errs = append(errs, f.Options.GetPrimary().TLS.Close()) - return errors.Join(errs...) + return nil } func (f *Factory) Purge(_ context.Context) error { diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index a97a1abc985..f2ffd551485 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -34,7 +34,7 @@ func newMockSessionBuilder(session *mocks.Session, err error) *mockSessionBuilde } } -func (m *mockSessionBuilder) NewSession(*zap.Logger) (cassandra.Session, error) { +func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { return m.session, m.err } @@ -195,7 +195,9 @@ func TestNewFactoryWithConfig(t *testing.T) { opts := &Options{ Primary: NamespaceConfig{ Configuration: cassandraCfg.Configuration{ - Servers: []string{"localhost:9200"}, + Connection: cassandraCfg.Connection{ + Servers: []string{"localhost:9200"}, + }, }, }, } @@ -215,7 +217,9 @@ func TestNewFactoryWithConfig(t *testing.T) { opts := &Options{ Primary: NamespaceConfig{ Configuration: cassandraCfg.Configuration{ - Servers: []string{"localhost:9200"}, + Connection: cassandraCfg.Connection{ + Servers: []string{"localhost:9200"}, + }, }, }, } diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index 1a8e338c8d9..e266436ba29 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -136,43 +136,43 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { } flagSet.Int( nsConfig.namespace+suffixConnPerHost, - nsConfig.ConnectionsPerHost, + nsConfig.Connection.ConnectionsPerHost, "The number of Cassandra connections from a single backend instance") flagSet.Int( nsConfig.namespace+suffixMaxRetryAttempts, - nsConfig.MaxRetryAttempts, + nsConfig.Query.MaxRetryAttempts, "The number of attempts when reading from Cassandra") flagSet.Duration( nsConfig.namespace+suffixTimeout, - nsConfig.Timeout, + nsConfig.Query.Timeout, "Timeout used for queries. A Timeout of zero means no timeout") flagSet.Duration( nsConfig.namespace+suffixConnectTimeout, - nsConfig.ConnectTimeout, + nsConfig.Connection.Timeout, "Timeout used for connections to Cassandra Servers") flagSet.Duration( nsConfig.namespace+suffixReconnectInterval, - nsConfig.ReconnectInterval, + nsConfig.Connection.ReconnectInterval, "Reconnect interval to retry connecting to downed hosts") flagSet.String( nsConfig.namespace+suffixServers, - strings.Join(nsConfig.Servers, ","), + strings.Join(nsConfig.Connection.Servers, ","), "The comma-separated list of Cassandra servers") flagSet.Int( nsConfig.namespace+suffixPort, - nsConfig.Port, + nsConfig.Connection.Port, "The port for cassandra") flagSet.String( nsConfig.namespace+suffixKeyspace, - nsConfig.Keyspace, + nsConfig.Schema.Keyspace, "The Cassandra keyspace for Jaeger data") flagSet.String( nsConfig.namespace+suffixDC, - nsConfig.LocalDC, + nsConfig.Connection.LocalDC, "The name of the Cassandra local data center for DC Aware host selection") flagSet.String( nsConfig.namespace+suffixConsistency, - nsConfig.Consistency, + nsConfig.Query.Consistency, "The Cassandra consistency level, e.g. ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE (default LOCAL_ONE)") flagSet.Bool( nsConfig.namespace+suffixDisableCompression, @@ -180,19 +180,19 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { "Disables the use of the default Snappy Compression while connecting to the Cassandra Cluster if set to true. This is useful for connecting to Cassandra Clusters(like Azure Cosmos Db with Cassandra API) that do not support SnappyCompression") flagSet.Int( nsConfig.namespace+suffixProtoVer, - nsConfig.ProtoVersion, + nsConfig.Connection.ProtoVersion, "The Cassandra protocol version") flagSet.Duration( nsConfig.namespace+suffixSocketKeepAlive, - nsConfig.SocketKeepAlive, + nsConfig.Connection.SocketKeepAlive, "Cassandra's keepalive period to use, enabled if > 0") flagSet.String( nsConfig.namespace+suffixUsername, - nsConfig.Authenticator.Basic.Username, + nsConfig.Connection.Authenticator.Basic.Username, "Username for password authentication for Cassandra") flagSet.String( nsConfig.namespace+suffixPassword, - nsConfig.Authenticator.Basic.Password, + nsConfig.Connection.Authenticator.Basic.Password, "Password for password authentication for Cassandra") flagSet.String( nsConfig.namespace+suffixAuth, @@ -228,30 +228,31 @@ func (cfg *NamespaceConfig) initFromViper(v *viper.Viper) { if cfg.namespace != primaryStorageConfig { cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) } - cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost) - cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts) - cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) - cfg.ConnectTimeout = v.GetDuration(cfg.namespace + suffixConnectTimeout) - cfg.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval) + cfg.Connection.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost) + cfg.Query.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts) + cfg.Query.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) + cfg.Connection.Timeout = v.GetDuration(cfg.namespace + suffixConnectTimeout) + cfg.Connection.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval) servers := stripWhiteSpace(v.GetString(cfg.namespace + suffixServers)) - cfg.Servers = strings.Split(servers, ",") - cfg.Port = v.GetInt(cfg.namespace + suffixPort) - cfg.Keyspace = v.GetString(cfg.namespace + suffixKeyspace) - cfg.LocalDC = v.GetString(cfg.namespace + suffixDC) - cfg.Consistency = v.GetString(cfg.namespace + suffixConsistency) - cfg.ProtoVersion = v.GetInt(cfg.namespace + suffixProtoVer) - cfg.SocketKeepAlive = v.GetDuration(cfg.namespace + suffixSocketKeepAlive) - cfg.Authenticator.Basic.Username = v.GetString(cfg.namespace + suffixUsername) - cfg.Authenticator.Basic.Password = v.GetString(cfg.namespace + suffixPassword) + cfg.Connection.Servers = strings.Split(servers, ",") + cfg.Connection.Port = v.GetInt(cfg.namespace + suffixPort) + cfg.Schema.Keyspace = v.GetString(cfg.namespace + suffixKeyspace) + cfg.Connection.LocalDC = v.GetString(cfg.namespace + suffixDC) + cfg.Query.Consistency = v.GetString(cfg.namespace + suffixConsistency) + cfg.Connection.ProtoVersion = v.GetInt(cfg.namespace + suffixProtoVer) + cfg.Connection.SocketKeepAlive = v.GetDuration(cfg.namespace + suffixSocketKeepAlive) + cfg.Connection.Authenticator.Basic.Username = v.GetString(cfg.namespace + suffixUsername) + cfg.Connection.Authenticator.Basic.Password = v.GetString(cfg.namespace + suffixPassword) authentication := stripWhiteSpace(v.GetString(cfg.namespace + suffixAuth)) - cfg.Authenticator.Basic.AllowedAuthenticators = strings.Split(authentication, ",") - cfg.DisableCompression = v.GetBool(cfg.namespace + suffixDisableCompression) + cfg.Connection.Authenticator.Basic.AllowedAuthenticators = strings.Split(authentication, ",") + cfg.Schema.DisableCompression = v.GetBool(cfg.namespace + suffixDisableCompression) var err error - cfg.TLS, err = tlsFlagsConfig.InitFromViper(v) + tlsCfg, err := tlsFlagsConfig.InitFromViper(v) if err != nil { // TODO refactor to be able to return error log.Fatal(err) } + cfg.Connection.TLS = tlsCfg.ToOtelClientConfig() } // GetPrimary returns primary configuration. @@ -270,8 +271,8 @@ func (opt *Options) Get(namespace string) *config.Configuration { return nil } nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) - if len(nsCfg.Servers) == 0 { - nsCfg.Servers = opt.Primary.Servers + if len(nsCfg.Connection.Servers) == 0 { + nsCfg.Connection.Servers = opt.Primary.Connection.Servers } return &nsCfg.Configuration } diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index b2a00245169..8ccd813c838 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -17,9 +17,9 @@ import ( func TestOptions(t *testing.T) { opts := NewOptions("foo") primary := opts.GetPrimary() - assert.NotEmpty(t, primary.Keyspace) - assert.NotEmpty(t, primary.Servers) - assert.Equal(t, 2, primary.ConnectionsPerHost) + assert.NotEmpty(t, primary.Schema.Keyspace) + assert.NotEmpty(t, primary.Connection.Servers) + assert.Equal(t, 2, primary.Connection.ConnectionsPerHost) aux := opts.Get("archive") assert.Nil(t, aux) @@ -28,10 +28,10 @@ func TestOptions(t *testing.T) { opts.others["archive"].Enabled = true aux = opts.Get("archive") require.NotNil(t, aux) - assert.Equal(t, primary.Keyspace, aux.Keyspace) - assert.Equal(t, primary.Servers, aux.Servers) - assert.Equal(t, primary.ConnectionsPerHost, aux.ConnectionsPerHost) - assert.Equal(t, primary.ReconnectInterval, aux.ReconnectInterval) + assert.Equal(t, primary.Schema.Keyspace, aux.Schema.Keyspace) + assert.Equal(t, primary.Connection.Servers, aux.Connection.Servers) + assert.Equal(t, primary.Connection.ConnectionsPerHost, aux.Connection.ConnectionsPerHost) + assert.Equal(t, primary.Connection.ReconnectInterval, aux.Connection.ReconnectInterval) } func TestOptionsWithFlags(t *testing.T) { @@ -67,11 +67,11 @@ func TestOptionsWithFlags(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.Equal(t, "jaeger", primary.Keyspace) - assert.Equal(t, "mojave", primary.LocalDC) - assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) - assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.datastax.bdp.cassandra.auth.DseAuthenticator"}, primary.Authenticator.Basic.AllowedAuthenticators) - assert.Equal(t, "ONE", primary.Consistency) + assert.Equal(t, "jaeger", primary.Schema.Keyspace) + assert.Equal(t, "mojave", primary.Connection.LocalDC) + assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Connection.Servers) + assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.datastax.bdp.cassandra.auth.DseAuthenticator"}, primary.Connection.Authenticator.Basic.AllowedAuthenticators) + assert.Equal(t, "ONE", primary.Query.Consistency) assert.Equal(t, []string{"blerg", "blarg", "blorg"}, opts.TagIndexBlacklist()) assert.Equal(t, []string{"flerg", "flarg", "florg"}, opts.TagIndexWhitelist()) assert.True(t, opts.Index.Tags) @@ -80,17 +80,17 @@ func TestOptionsWithFlags(t *testing.T) { aux := opts.Get("cas-aux") require.NotNil(t, aux) - assert.Equal(t, "jaeger-archive", aux.Keyspace) - assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) - assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Authenticator.Basic.AllowedAuthenticators) - assert.Equal(t, 42, aux.ConnectionsPerHost) - assert.Equal(t, 42, aux.MaxRetryAttempts) - assert.Equal(t, 42*time.Second, aux.Timeout) - assert.Equal(t, 42*time.Second, aux.ReconnectInterval) - assert.Equal(t, 4242, aux.Port) - assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary") - assert.Equal(t, 3, aux.ProtoVersion) - assert.Equal(t, 42*time.Second, aux.SocketKeepAlive) + assert.Equal(t, "jaeger-archive", aux.Schema.Keyspace) + assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Connection.Servers) + assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Connection.Authenticator.Basic.AllowedAuthenticators) + assert.Equal(t, 42, aux.Connection.ConnectionsPerHost) + assert.Equal(t, 42, aux.Query.MaxRetryAttempts) + assert.Equal(t, 42*time.Second, aux.Query.Timeout) + assert.Equal(t, 42*time.Second, aux.Connection.ReconnectInterval) + assert.Equal(t, 4242, aux.Connection.Port) + assert.Equal(t, "", aux.Query.Consistency, "aux storage does not inherit consistency from primary") + assert.Equal(t, 3, aux.Connection.ProtoVersion) + assert.Equal(t, 42*time.Second, aux.Connection.SocketKeepAlive) } func TestDefaultTlsHostVerify(t *testing.T) { @@ -102,7 +102,7 @@ func TestDefaultTlsHostVerify(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.False(t, primary.TLS.SkipHostVerify) + assert.False(t, primary.Connection.TLS.InsecureSkipVerify) } func TestEmptyBlackWhiteLists(t *testing.T) { diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index 632d9c765e9..00bfe9b4ed5 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -24,13 +24,19 @@ var logger, _ = zap.NewDevelopment() func main() { noScope := metrics.NullFactory cConfig := &cascfg.Configuration{ - Servers: []string{"127.0.0.1"}, - ConnectionsPerHost: 10, - Timeout: time.Millisecond * 750, - ProtoVersion: 4, - Keyspace: "jaeger_v1_test", + Schema: cascfg.Schema{ + Keyspace: "jaeger_v1_test", + }, + Connection: cascfg.Connection{ + Servers: []string{"127.0.0.1"}, + ConnectionsPerHost: 10, + ProtoVersion: 4, + }, + Query: cascfg.Query{ + Timeout: time.Millisecond * 750, + }, } - cqlSession, err := cConfig.NewSession(logger) + cqlSession, err := cConfig.NewSession() if err != nil { logger.Fatal("Cannot create Cassandra session", zap.Error(err)) }