Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Align Cassandra Storage Config With OTEL #5949

Merged
merged 22 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,26 @@ extensions:
backends:
some_storage:
cassandra:
keyspace: "jaeger_v1_dc1"
username: "cassandra"
password: "cassandra"
schema:
keyspace: "jaeger_v1_dc1"
connection:
auth:
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
basic:
username: "cassandra"
password: "cassandra"
tls:
insecure: true
another_storage:
cassandra:
keyspace: "jaeger_v1_dc1"
username: "cassandra"
password: "cassandra"
schema:
keyspace: "jaeger_v1_dc1"
connection:
auth:
basic:
username: "cassandra"
password: "cassandra"
tls:
insecure: true
receivers:
otlp:
protocols:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ backends:
`)
cfg := createDefaultConfig().(*Config)
require.NoError(t, conf.Unmarshal(cfg))
assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Servers)
assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Connection.Servers)
}

func TestConfigDefaultElasticsearch(t *testing.T) {
Expand Down
209 changes: 122 additions & 87 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,99 +5,138 @@
package config

import (
"context"
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.uber.org/zap"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/cassandra"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
)

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
// Configuration describes the configuration properties needed to connect to a Cassandra cluster.
type Configuration struct {
Servers []string `valid:"required,url" mapstructure:"servers"`
Keyspace string `mapstructure:"keyspace"`
LocalDC string `mapstructure:"local_dc"`
ConnectionsPerHost int `mapstructure:"connections_per_host"`
Timeout time.Duration `mapstructure:"-"`
ConnectTimeout time.Duration `mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
ProtoVersion int `mapstructure:"proto_version"`
Consistency string `mapstructure:"consistency"`
DisableCompression bool `mapstructure:"disable_compression"`
Port int `mapstructure:"port"`
Authenticator Authenticator `mapstructure:",squash"`
DisableAutoDiscovery bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
Schema Schema `mapstructure:"schema"`
Connection Connection `mapstructure:"connection"`
Query Query `mapstructure:"query"`
}

func DefaultConfiguration() Configuration {
return Configuration{
Servers: []string{"127.0.0.1"},
Port: 9042,
MaxRetryAttempts: 3,
Keyspace: "jaeger_v1_test",
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
}
type Connection struct {
// Servers contains a list of hosts that are used to connect to the cluster.
Servers []string `mapstructure:"servers" valid:"required,url"`
// LocalDC contains the name of the local Data Center (DC) for DC-aware host selection
LocalDC string `mapstructure:"local_dc"`
// The port used when dialing to a cluster.
Port int `mapstructure:"port"`
// DisableAutoDiscovery, if set to true, will disable the cluster's auto-discovery features.
DisableAutoDiscovery bool `mapstructure:"disable_auto_discovery"`
// ConnectionsPerHost contains the maximum number of open connections for each host on the cluster.
ConnectionsPerHost int `mapstructure:"connections_per_host"`
// ReconnectInterval contains the regular interval after which the driver tries to connect to
// nodes that are down.
ReconnectInterval time.Duration `mapstructure:"reconnect_interval"`
// SocketKeepAlive contains the keep alive period for the default dialer to the cluster.
SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"`
// TLS contains the TLS configuration for the connection to the cluster.
TLS configtls.ClientConfig `mapstructure:"tls"`
// Timeout contains the maximum time spent to connect to a cluster.
Timeout time.Duration `mapstructure:"timeout"`
// Authenticator contains the details of the authentication mechanism that is used for
// connecting to a cluster.
Authenticator Authenticator `mapstructure:"auth"`
// ProtoVersion contains the version of the native protocol to use when connecting to a cluster.
ProtoVersion int `mapstructure:"proto_version"`
}

type Schema struct {
// Keyspace contains the namespace where Jaeger data will be stored.
Keyspace string `mapstructure:"keyspace"`
// DisableCompression, if set to true, disables the use of the default Snappy Compression
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster
type Query struct {
// Timeout contains the maximum time spent executing a query.
Timeout time.Duration `mapstructure:"timeout"`
// MaxRetryAttempts indicates the maximum number of times a query will be retried for execution.
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
// Consistency specifies the consistency level which needs to be satisified before responding
// to a query.
Consistency string `mapstructure:"consistency"`
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster.
type Authenticator struct {
Basic BasicAuthenticator `yaml:"basic" mapstructure:",squash"`
Basic BasicAuthenticator `mapstructure:"basic"`
// TODO: add more auth types
}

// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster
// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster.
type BasicAuthenticator struct {
Username string `yaml:"username" mapstructure:"username"`
Password string `yaml:"password" mapstructure:"password" json:"-"`
AllowedAuthenticators []string `yaml:"allowed_authenticators" mapstructure:"allowed_authenticators"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
AllowedAuthenticators []string `mapstructure:"allowed_authenticators"`
}

func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Port: 9042,
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
},
Query: Query{
MaxRetryAttempts: 3,
},
}
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.ConnectionsPerHost == 0 {
c.ConnectionsPerHost = source.ConnectionsPerHost
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace

Check warning on line 107 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L107

Added line #L107 was not covered by tests
}
if c.MaxRetryAttempts == 0 {
c.MaxRetryAttempts = source.MaxRetryAttempts
if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
if c.Timeout == 0 {
c.Timeout = source.Timeout
if c.Connection.ReconnectInterval == 0 {
c.Connection.ReconnectInterval = source.Connection.ReconnectInterval
}
if c.ReconnectInterval == 0 {
c.ReconnectInterval = source.ReconnectInterval
if c.Connection.Port == 0 {
c.Connection.Port = source.Connection.Port
}
if c.Port == 0 {
c.Port = source.Port
if c.Connection.ProtoVersion == 0 {
c.Connection.ProtoVersion = source.Connection.ProtoVersion
}
if c.Keyspace == "" {
c.Keyspace = source.Keyspace
if c.Connection.SocketKeepAlive == 0 {
c.Connection.SocketKeepAlive = source.Connection.SocketKeepAlive
}
if c.ProtoVersion == 0 {
c.ProtoVersion = source.ProtoVersion
if c.Query.MaxRetryAttempts == 0 {
c.Query.MaxRetryAttempts = source.Query.MaxRetryAttempts
}
if c.SocketKeepAlive == 0 {
c.SocketKeepAlive = source.SocketKeepAlive
if c.Query.Timeout == 0 {
c.Query.Timeout = source.Query.Timeout
}
}

// SessionBuilder creates new cassandra.Session
type SessionBuilder interface {
NewSession(logger *zap.Logger) (cassandra.Session, error)
NewSession() (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error) {
cluster, err := c.NewCluster(logger)
func (c *Configuration) NewSession() (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}
Expand All @@ -109,68 +148,64 @@
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Servers...)
cluster.Keyspace = c.Keyspace
cluster.NumConns = c.ConnectionsPerHost
cluster.Timeout = c.Timeout
cluster.ConnectTimeout = c.ConnectTimeout
cluster.ReconnectInterval = c.ReconnectInterval
cluster.SocketKeepalive = c.SocketKeepAlive
if c.ProtoVersion > 0 {
cluster.ProtoVersion = c.ProtoVersion
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
cluster.Keyspace = c.Schema.Keyspace
cluster.NumConns = c.Connection.ConnectionsPerHost
cluster.ConnectTimeout = c.Connection.Timeout
cluster.ReconnectInterval = c.Connection.ReconnectInterval
cluster.SocketKeepalive = c.Connection.SocketKeepAlive
cluster.Timeout = c.Query.Timeout
if c.Connection.ProtoVersion > 0 {
cluster.ProtoVersion = c.Connection.ProtoVersion
}
if c.MaxRetryAttempts > 1 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.MaxRetryAttempts - 1}
if c.Query.MaxRetryAttempts > 1 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Query.MaxRetryAttempts - 1}
}
if c.Port != 0 {
cluster.Port = c.Port
if c.Connection.Port != 0 {
cluster.Port = c.Connection.Port
}

if !c.DisableCompression {
if !c.Schema.DisableCompression {
cluster.Compressor = gocql.SnappyCompressor{}
}

if c.Consistency == "" {
if c.Query.Consistency == "" {
cluster.Consistency = gocql.LocalOne
} else {
cluster.Consistency = gocql.ParseConsistency(c.Consistency)
cluster.Consistency = gocql.ParseConsistency(c.Query.Consistency)

Check warning on line 176 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L176

Added line #L176 was not covered by tests
}

fallbackHostSelectionPolicy := gocql.RoundRobinHostPolicy()
if c.LocalDC != "" {
fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.LocalDC)
if c.Connection.LocalDC != "" {
fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.Connection.LocalDC)

Check warning on line 181 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L181

Added line #L181 was not covered by tests
}
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallbackHostSelectionPolicy, gocql.ShuffleReplicas())

if c.Authenticator.Basic.Username != "" && c.Authenticator.Basic.Password != "" {
if c.Connection.Authenticator.Basic.Username != "" && c.Connection.Authenticator.Basic.Password != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: c.Authenticator.Basic.Username,
Password: c.Authenticator.Basic.Password,
AllowedAuthenticators: c.Authenticator.Basic.AllowedAuthenticators,
Username: c.Connection.Authenticator.Basic.Username,
Password: c.Connection.Authenticator.Basic.Password,
AllowedAuthenticators: c.Connection.Authenticator.Basic.AllowedAuthenticators,
}
}
tlsCfg, err := c.TLS.Config(logger)
if err != nil {
return nil, err
}
if c.TLS.Enabled {
if !c.Connection.TLS.Insecure {
tlsCfg, err := c.Connection.TLS.LoadTLSConfig(context.Background())
if err != nil {
return nil, err

Check warning on line 195 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L193-L195

Added lines #L193 - L195 were not covered by tests
}
cluster.SslOpts = &gocql.SslOptions{
Config: tlsCfg,
}
}
// If tunneling connection to C*, disable cluster autodiscovery features.
if c.DisableAutoDiscovery {
if c.Connection.DisableAutoDiscovery {
cluster.DisableInitialHostLookup = true
cluster.IgnorePeerAddr = true
}
return cluster, nil
}

func (c *Configuration) Close() error {
return c.TLS.Close()
}

func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/cassandra/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2024 The Jaeger Authors.
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package config

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidate_ReturnsErrorWhenInvalid(t *testing.T) {
tests := []struct {
name string
cfg *Configuration
}{
{
name: "missing required fields",
cfg: &Configuration{},
},
{
name: "require fields in invalid format",
cfg: &Configuration{
Connection: Connection{
Servers: []string{"not a url"},
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := test.cfg.Validate()
require.Error(t, err)
})
}
}

func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) {
cfg := Configuration{
Connection: Connection{
Servers: []string{"localhost:9200"},
},
}

err := cfg.Validate()
require.NoError(t, err)
}
Loading
Loading