Skip to content

Commit

Permalink
Create Separate Grouping For Query
Browse files Browse the repository at this point in the history
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
  • Loading branch information
mahadzaryab1 committed Sep 10, 2024
1 parent 110753a commit 71d5da3
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 26 deletions.
35 changes: 20 additions & 15 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type Configuration struct {
Schema Schema `mapstructure:"schema"`
Connection Connection `mapstructure:"connection"`
Query Query `mapstructure:"query"`
}

type Connection struct {
Expand All @@ -31,10 +32,8 @@ type Connection struct {
ConnectionsPerHost int `mapstructure:"connections_per_host"`
ReconnectInterval time.Duration `mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
TLS configtls.ClientConfig `mapstructure:"tls"`
QueryTimeout time.Duration `mapstructure:"query_timeout"`
ConnectTimeout time.Duration `mapstructure:"connection_timeout"`
Timeout time.Duration `mapstructure:"timeout"`
Authenticator Authenticator `mapstructure:"auth"`
ProtoVersion int `mapstructure:"proto_version"`
Consistency string `mapstructure:"consistency"`
Expand All @@ -45,6 +44,11 @@ type Schema struct {
DisableCompression bool `mapstructure:"disable_compression"`
}

type Query struct {
Timeout time.Duration `mapstructure:"timeout"`
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster
type Authenticator struct {
Basic BasicAuthenticator `mapstructure:"basic"`
Expand All @@ -60,16 +64,18 @@ type BasicAuthenticator struct {

func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Port: 9042,
MaxRetryAttempts: 3,
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
},
Schema: Schema{
Keyspace: "jaeger_v1_test",
Query: Query{
MaxRetryAttempts: 3,
},
}
}
Expand All @@ -79,11 +85,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
if c.Connection.MaxRetryAttempts == 0 {
c.Connection.MaxRetryAttempts = source.Connection.MaxRetryAttempts
if c.Query.MaxRetryAttempts == 0 {
c.Query.MaxRetryAttempts = source.Query.MaxRetryAttempts
}
if c.Connection.QueryTimeout == 0 {
c.Connection.QueryTimeout = source.Connection.QueryTimeout
if c.Query.Timeout == 0 {
c.Query.Timeout = source.Query.Timeout
}
if c.Connection.ReconnectInterval == 0 {
c.Connection.ReconnectInterval = source.Connection.ReconnectInterval
Expand All @@ -97,7 +103,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Connection.SocketKeepAlive == 0 {
c.Connection.SocketKeepAlive = source.Connection.SocketKeepAlive
}

if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace
}
Expand Down Expand Up @@ -126,15 +131,15 @@ func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
cluster.Keyspace = c.Schema.Keyspace
cluster.NumConns = c.Connection.ConnectionsPerHost
cluster.Timeout = c.Connection.QueryTimeout
cluster.ConnectTimeout = c.Connection.ConnectTimeout
cluster.Timeout = c.Query.Timeout
cluster.ConnectTimeout = c.Connection.Timeout
cluster.ReconnectInterval = c.Connection.ReconnectInterval
cluster.SocketKeepalive = c.Connection.SocketKeepAlive
if c.Connection.ProtoVersion > 0 {
cluster.ProtoVersion = c.Connection.ProtoVersion
}
if c.Connection.MaxRetryAttempts > 1 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Connection.MaxRetryAttempts - 1}
if c.Query.MaxRetryAttempts > 1 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Query.MaxRetryAttempts - 1}
}
if c.Connection.Port != 0 {
cluster.Port = c.Connection.Port
Expand Down
12 changes: 6 additions & 6 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) {
"The number of Cassandra connections from a single backend instance")
flagSet.Int(
nsConfig.namespace+suffixMaxRetryAttempts,
nsConfig.Connection.MaxRetryAttempts,
nsConfig.Query.MaxRetryAttempts,
"The number of attempts when reading from Cassandra")
flagSet.Duration(
nsConfig.namespace+suffixTimeout,
nsConfig.Connection.QueryTimeout,
nsConfig.Query.Timeout,
"Timeout used for queries. A Timeout of zero means no timeout")
flagSet.Duration(
nsConfig.namespace+suffixConnectTimeout,
nsConfig.Connection.ConnectTimeout,
nsConfig.Connection.Timeout,
"Timeout used for connections to Cassandra Servers")
flagSet.Duration(
nsConfig.namespace+suffixReconnectInterval,
Expand Down Expand Up @@ -229,9 +229,9 @@ func (cfg *NamespaceConfig) initFromViper(v *viper.Viper) {
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
}
cfg.Connection.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost)
cfg.Connection.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
cfg.Connection.QueryTimeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.Connection.ConnectTimeout = v.GetDuration(cfg.namespace + suffixConnectTimeout)
cfg.Query.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
cfg.Query.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.Connection.Timeout = v.GetDuration(cfg.namespace + suffixConnectTimeout)
cfg.Connection.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval)
servers := stripWhiteSpace(v.GetString(cfg.namespace + suffixServers))
cfg.Connection.Servers = strings.Split(servers, ",")
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Connection.Servers)
assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Connection.Authenticator.Basic.AllowedAuthenticators)
assert.Equal(t, 42, aux.Connection.ConnectionsPerHost)
assert.Equal(t, 42, aux.Connection.MaxRetryAttempts)
assert.Equal(t, 42*time.Second, aux.Connection.QueryTimeout)
assert.Equal(t, 42, aux.Query.MaxRetryAttempts)
assert.Equal(t, 42*time.Second, aux.Query.Timeout)
assert.Equal(t, 42*time.Second, aux.Connection.ReconnectInterval)
assert.Equal(t, 4242, aux.Connection.Port)
assert.Equal(t, "", aux.Connection.Consistency, "aux storage does not inherit consistency from primary")
Expand Down
8 changes: 5 additions & 3 deletions plugin/storage/cassandra/savetracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ var logger, _ = zap.NewDevelopment()
func main() {
noScope := metrics.NullFactory
cConfig := &cascfg.Configuration{
Schema: cascfg.Schema{
Keyspace: "jaeger_v1_test",
},
Connection: cascfg.Connection{
Servers: []string{"127.0.0.1"},
ConnectionsPerHost: 10,
QueryTimeout: time.Millisecond * 750,
ProtoVersion: 4,
},
Schema: cascfg.Schema{
Keyspace: "jaeger_v1_test",
Query: cascfg.Query{
Timeout: time.Millisecond * 750,
},
}
cqlSession, err := cConfig.NewSession()
Expand Down

0 comments on commit 71d5da3

Please sign in to comment.