From 4c029cb2064275ae1ecba171d33f5023fb5f025d Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 19 Sep 2024 19:49:56 -0600 Subject: [PATCH 1/4] Migrate Kafka Auth Config To OTEL TLS Type Signed-off-by: Mahad Zaryab --- cmd/ingester/app/builder/builder.go | 3 ++- cmd/ingester/app/flags_test.go | 12 ++++++------ cmd/ingester/main.go | 3 --- pkg/kafka/auth/config.go | 22 ++++++++++++---------- pkg/kafka/auth/config_test.go | 19 ++++++++----------- pkg/kafka/auth/tls.go | 11 +++++------ pkg/kafka/auth/tls_test.go | 13 +++++-------- pkg/kafka/consumer/config.go | 6 +++--- pkg/kafka/consumer/config_test.go | 5 ++--- pkg/kafka/producer/config.go | 8 ++++---- pkg/kafka/producer/config_test.go | 5 ++--- plugin/storage/kafka/factory.go | 4 ++-- plugin/storage/kafka/factory_test.go | 3 ++- plugin/storage/kafka/options_test.go | 12 ++++++------ 14 files changed, 59 insertions(+), 67 deletions(-) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index dc4abbec2fc..9676a794fb7 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -4,6 +4,7 @@ package builder import ( + "context" "fmt" "strings" @@ -50,7 +51,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit RackID: options.RackID, FetchMaxMessageBytes: options.FetchMaxMessageBytes, } - saramaConsumer, err := consumerConfig.NewConsumer(logger) + saramaConsumer, err := consumerConfig.NewConsumer(context.Background()) if err != nil { return nil, err } diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 9296c99b71d..61d8c49d009 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -10,9 +10,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/kafka/auth" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/kafka" @@ -56,23 +56,23 @@ func TestTLSFlags(t *testing.T) { }{ { flags: []string{}, - expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: true}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=foo"}, - expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: true}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain}, }, } diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 00d9f28e58d..39bb8c971a3 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -68,9 +68,6 @@ func main() { consumer.Start() svc.RunAndThen(func() { - if err := options.TLS.Close(); err != nil { - logger.Error("Failed to close TLS certificates watcher", zap.Error(err)) - } if err = consumer.Close(); err != nil { logger.Error("Failed to close consumer", zap.Error(err)) } diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index 955a347777c..a0b4e45ff31 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -4,12 +4,13 @@ package auth import ( + "context" "fmt" "strings" "github.com/Shopify/sarama" "github.com/spf13/viper" - "go.uber.org/zap" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) @@ -30,20 +31,20 @@ var authTypes = []string{ // AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster type AuthenticationConfig struct { - Authentication string `mapstructure:"type"` - Kerberos KerberosConfig `mapstructure:"kerberos"` - TLS tlscfg.Options `mapstructure:"tls"` - PlainText PlainTextConfig `mapstructure:"plaintext"` + Authentication string `mapstructure:"type"` + Kerberos KerberosConfig `mapstructure:"kerberos"` + TLS configtls.ClientConfig `mapstructure:"tls"` + PlainText PlainTextConfig `mapstructure:"plaintext"` } // SetConfiguration set configure authentication into sarama config structure -func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config, logger *zap.Logger) error { +func (config *AuthenticationConfig) SetConfiguration(ctx context.Context, saramaConfig *sarama.Config) error { authentication := strings.ToLower(config.Authentication) if strings.Trim(authentication, " ") == "" { authentication = none } - if config.Authentication == tls || config.TLS.Enabled { - err := setTLSConfiguration(&config.TLS, saramaConfig, logger) + if config.Authentication == tls || !config.TLS.Insecure { + err := setTLSConfiguration(ctx, &config.TLS, saramaConfig) if err != nil { return err } @@ -80,13 +81,14 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. } var err error - config.TLS, err = tlsClientConfig.InitFromViper(v) + tlsCfg, err := tlsClientConfig.InitFromViper(v) if err != nil { return fmt.Errorf("failed to process Kafka TLS options: %w", err) } if config.Authentication == tls { - config.TLS.Enabled = true + tlsCfg.Enabled = true } + config.TLS = tlsCfg.ToOtelClientConfig() config.PlainText.Username = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUsername) config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword) diff --git a/pkg/kafka/auth/config_test.go b/pkg/kafka/auth/config_test.go index a32b0077c02..d056d744251 100644 --- a/pkg/kafka/auth/config_test.go +++ b/pkg/kafka/auth/config_test.go @@ -4,6 +4,7 @@ package auth import ( + "context" "flag" "testing" @@ -11,11 +12,9 @@ import ( "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) func addFlags(flags *flag.FlagSet) { @@ -64,8 +63,8 @@ func Test_InitFromViper(t *testing.T) { KeyTabPath: "/path/to/keytab", DisablePAFXFast: true, }, - TLS: tlscfg.Options{ - Enabled: true, + TLS: configtls.ClientConfig{ + Insecure: false, }, PlainText: PlainTextConfig{ Username: "user", @@ -77,16 +76,15 @@ func Test_InitFromViper(t *testing.T) { } // Test plaintext with different mechanisms -func testPlaintext(v *viper.Viper, t *testing.T, configPrefix string, logger *zap.Logger, mechanism string, saramaConfig *sarama.Config) { +func testPlaintext(ctx context.Context, v *viper.Viper, t *testing.T, configPrefix string, mechanism string, saramaConfig *sarama.Config) { v.Set(configPrefix+plainTextPrefix+suffixPlainTextMechanism, mechanism) authConfig := &AuthenticationConfig{} err := authConfig.InitFromViper(configPrefix, v) require.NoError(t, err) - require.NoError(t, authConfig.SetConfiguration(saramaConfig, logger)) + require.NoError(t, authConfig.SetConfiguration(ctx, saramaConfig)) } func TestSetConfiguration(t *testing.T) { - logger := zaptest.NewLogger(t) saramaConfig := sarama.NewConfig() configPrefix := "kafka.auth" v, command := config.Viperize(addFlags) @@ -149,7 +147,6 @@ func TestSetConfiguration(t *testing.T) { "--kafka.auth.authentication=" + tt.authType, }) authConfig := &AuthenticationConfig{} - defer authConfig.TLS.Close() err := authConfig.InitFromViper(configPrefix, v) require.NoError(t, err) @@ -159,10 +156,10 @@ func TestSetConfiguration(t *testing.T) { if len(tt.plainTextMechanisms) > 0 { for _, mechanism := range tt.plainTextMechanisms { - testPlaintext(v, t, configPrefix, logger, mechanism, saramaConfig) + testPlaintext(context.Background(), v, t, configPrefix, mechanism, saramaConfig) } } else { - err = authConfig.SetConfiguration(saramaConfig, logger) + err = authConfig.SetConfiguration(context.Background(), saramaConfig) if tt.expectedError != "" { require.EqualError(t, err, tt.expectedError) } else { diff --git a/pkg/kafka/auth/tls.go b/pkg/kafka/auth/tls.go index db905b7fc21..3e15433fc7f 100644 --- a/pkg/kafka/auth/tls.go +++ b/pkg/kafka/auth/tls.go @@ -4,17 +4,16 @@ package auth import ( + "context" "fmt" "github.com/Shopify/sarama" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "go.opentelemetry.io/collector/config/configtls" ) -func setTLSConfiguration(config *tlscfg.Options, saramaConfig *sarama.Config, logger *zap.Logger) error { - if config.Enabled { - tlsConfig, err := config.Config(logger) +func setTLSConfiguration(ctx context.Context, config *configtls.ClientConfig, saramaConfig *sarama.Config) error { + if !config.Insecure { + tlsConfig, err := config.LoadTLSConfig(ctx) if err != nil { return fmt.Errorf("error loading tls config: %w", err) } diff --git a/pkg/kafka/auth/tls_test.go b/pkg/kafka/auth/tls_test.go index d8dadf6cd64..2f895124afc 100644 --- a/pkg/kafka/auth/tls_test.go +++ b/pkg/kafka/auth/tls_test.go @@ -4,25 +4,22 @@ package auth import ( + "context" "testing" "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" - - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "go.opentelemetry.io/collector/config/configtls" ) func TestSetTLSConfiguration(t *testing.T) { - logger := zaptest.NewLogger(t) saramaConfig := sarama.NewConfig() - tlsConfig := &tlscfg.Options{ - Enabled: true, + tlsConfig := &configtls.ClientConfig{ + Insecure: false, } - err := setTLSConfiguration(tlsConfig, saramaConfig, logger) + err := setTLSConfiguration(context.Background(), tlsConfig, saramaConfig) require.NoError(t, err) assert.True(t, saramaConfig.Net.TLS.Enable) assert.NotNil(t, saramaConfig.Net.TLS.Config) - defer tlsConfig.Close() } diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 0a44c20f6e0..e47754ce067 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -4,12 +4,12 @@ package consumer import ( + "context" "io" "time" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) @@ -42,7 +42,7 @@ type Configuration struct { } // NewConsumer creates a new kafka consumer -func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) { +func (c *Configuration) NewConsumer(ctx context.Context) (Consumer, error) { saramaConfig := cluster.NewConfig() saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID @@ -55,7 +55,7 @@ func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) { } saramaConfig.Config.Version = ver } - if err := c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config, logger); err != nil { + if err := c.AuthenticationConfig.SetConfiguration(ctx, &saramaConfig.Config); err != nil { return nil, err } // cluster.NewConfig() uses sarama.NewConfig() to create the config. diff --git a/pkg/kafka/consumer/config_test.go b/pkg/kafka/consumer/config_test.go index 563fc9d6d1a..1228db1303f 100644 --- a/pkg/kafka/consumer/config_test.go +++ b/pkg/kafka/consumer/config_test.go @@ -4,17 +4,16 @@ package consumer import ( + "context" "testing" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) func TestSetConfiguration(t *testing.T) { - logger := zaptest.NewLogger(t) test := &Configuration{AuthenticationConfig: auth.AuthenticationConfig{Authentication: "fail"}} - _, err := test.NewConsumer(logger) + _, err := test.NewConsumer(context.Background()) require.EqualError(t, err, "Unknown/Unsupported authentication method fail to kafka cluster") } diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 22402784e60..860c16e98cb 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -4,17 +4,17 @@ package producer import ( + "context" "time" "github.com/Shopify/sarama" - "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) // Builder builds a new kafka producer type Builder interface { - NewProducer(logger *zap.Logger) (sarama.AsyncProducer, error) + NewProducer(ctx context.Context) (sarama.AsyncProducer, error) } // Configuration describes the configuration properties needed to create a Kafka producer @@ -33,7 +33,7 @@ type Configuration struct { } // NewProducer creates a new asynchronous kafka producer -func (c *Configuration) NewProducer(logger *zap.Logger) (sarama.AsyncProducer, error) { +func (c *Configuration) NewProducer(ctx context.Context) (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.RequiredAcks = c.RequiredAcks saramaConfig.Producer.Compression = c.Compression @@ -51,7 +51,7 @@ func (c *Configuration) NewProducer(logger *zap.Logger) (sarama.AsyncProducer, e } saramaConfig.Version = ver } - if err := c.AuthenticationConfig.SetConfiguration(saramaConfig, logger); err != nil { + if err := c.AuthenticationConfig.SetConfiguration(ctx, saramaConfig); err != nil { return nil, err } return sarama.NewAsyncProducer(c.Brokers, saramaConfig) diff --git a/pkg/kafka/producer/config_test.go b/pkg/kafka/producer/config_test.go index 87bd8631c69..acd6245b704 100644 --- a/pkg/kafka/producer/config_test.go +++ b/pkg/kafka/producer/config_test.go @@ -4,17 +4,16 @@ package producer import ( + "context" "testing" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) func TestSetConfiguration(t *testing.T) { - logger := zaptest.NewLogger(t) test := &Configuration{AuthenticationConfig: auth.AuthenticationConfig{Authentication: "fail"}} - _, err := test.NewProducer(logger) + _, err := test.NewProducer(context.Background()) require.EqualError(t, err, "Unknown/Unsupported authentication method fail to kafka cluster") } diff --git a/plugin/storage/kafka/factory.go b/plugin/storage/kafka/factory.go index d1b59194073..8af5c114987 100644 --- a/plugin/storage/kafka/factory.go +++ b/plugin/storage/kafka/factory.go @@ -11,6 +11,7 @@ import ( "github.com/Shopify/sarama" "github.com/spf13/viper" "go.uber.org/zap" + "golang.org/x/net/context" "github.com/jaegertracing/jaeger/pkg/kafka/producer" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -74,7 +75,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) default: return errors.New("kafka encoding is not one of '" + EncodingJSON + "' or '" + EncodingProto + "'") } - p, err := f.NewProducer(logger) + p, err := f.NewProducer(context.Background()) if err != nil { return err } @@ -105,6 +106,5 @@ func (f *Factory) Close() error { if f.producer != nil { errs = append(errs, f.producer.Close()) } - errs = append(errs, f.options.Config.TLS.Close()) return errors.Join(errs...) } diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index 10fa1df832f..27e1bd35d6b 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -5,6 +5,7 @@ package kafka import ( "bytes" + "context" "errors" "testing" @@ -26,7 +27,7 @@ type mockProducerBuilder struct { t *testing.T } -func (m *mockProducerBuilder) NewProducer(*zap.Logger) (sarama.AsyncProducer, error) { +func (m *mockProducerBuilder) NewProducer(_ context.Context) (sarama.AsyncProducer, error) { if m.err == nil { return mocks.NewAsyncProducer(m.t, nil), nil } diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index a2b10405e3b..28c83c1765c 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -11,9 +11,9 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) @@ -173,23 +173,23 @@ func TestTLSFlags(t *testing.T) { }{ { flags: []string{}, - expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: true}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=foo"}, - expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: true}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain}, }, } From 2c862f42766551d2cbce596322b83a65ed059a4c Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 19 Sep 2024 20:15:46 -0600 Subject: [PATCH 2/4] Fix Unit Tests Signed-off-by: Mahad Zaryab --- pkg/kafka/auth/config.go | 5 ++++- pkg/kafka/auth/config_test.go | 30 ++++++++++++------------------ pkg/kafka/auth/tls.go | 6 ++++-- pkg/kafka/consumer/config_test.go | 2 +- pkg/kafka/producer/config_test.go | 2 +- 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index a0b4e45ff31..fe818ceb1ec 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -5,6 +5,7 @@ package auth import ( "context" + "errors" "fmt" "strings" @@ -29,6 +30,8 @@ var authTypes = []string{ plaintext, } +var ErrUnsupportedAuthMethod = errors.New("unknown or unsupported authentication method to kafka cluster") + // AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster type AuthenticationConfig struct { Authentication string `mapstructure:"type"` @@ -60,7 +63,7 @@ func (config *AuthenticationConfig) SetConfiguration(ctx context.Context, sarama case plaintext: return setPlainTextConfiguration(&config.PlainText, saramaConfig) default: - return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication) + return fmt.Errorf("%w [type=%s]", ErrUnsupportedAuthMethod, config.Authentication) } } diff --git a/pkg/kafka/auth/config_test.go b/pkg/kafka/auth/config_test.go index d056d744251..7b3f09d7c89 100644 --- a/pkg/kafka/auth/config_test.go +++ b/pkg/kafka/auth/config_test.go @@ -93,51 +93,45 @@ func TestSetConfiguration(t *testing.T) { tests := []struct { name string authType string - expectedError string + expectedError error plainTextMechanisms []string }{ { name: "Invalid authentication method", authType: "fail", - expectedError: "Unknown/Unsupported authentication method fail to kafka cluster", + expectedError: ErrUnsupportedAuthMethod, }, { - name: "Kerberos authentication", - authType: "kerberos", - expectedError: "", + name: "Kerberos authentication", + authType: "kerberos", }, { name: "Plaintext authentication with SCRAM-SHA-256", authType: "plaintext", - expectedError: "", plainTextMechanisms: []string{"SCRAM-SHA-256"}, }, { name: "Plaintext authentication with SCRAM-SHA-512", authType: "plaintext", - expectedError: "", plainTextMechanisms: []string{"SCRAM-SHA-512"}, }, { name: "Plaintext authentication with PLAIN", authType: "plaintext", - expectedError: "", plainTextMechanisms: []string{"PLAIN"}, }, { - name: "No authentication", - authType: " ", - expectedError: "", + name: "No authentication", + authType: " ", }, { - name: "TLS authentication", - authType: "tls", - expectedError: "", + name: "TLS authentication", + authType: "tls", }, { name: "TLS authentication with invalid cipher suite", authType: "tls", - expectedError: "error loading tls config: failed to get cipher suite ids from cipher suite names: cipher suite fail not supported or doesn't exist", + expectedError: ErrLoadingTLSConfig, }, } @@ -150,7 +144,7 @@ func TestSetConfiguration(t *testing.T) { err := authConfig.InitFromViper(configPrefix, v) require.NoError(t, err) - if tt.authType == "tls" && tt.expectedError != "" { + if tt.authType == "tls" && tt.expectedError != nil { authConfig.TLS.CipherSuites = []string{"fail"} } @@ -160,8 +154,8 @@ func TestSetConfiguration(t *testing.T) { } } else { err = authConfig.SetConfiguration(context.Background(), saramaConfig) - if tt.expectedError != "" { - require.EqualError(t, err, tt.expectedError) + if tt.expectedError != nil { + require.ErrorIs(t, err, tt.expectedError) } else { require.NoError(t, err) } diff --git a/pkg/kafka/auth/tls.go b/pkg/kafka/auth/tls.go index 3e15433fc7f..db99698ddda 100644 --- a/pkg/kafka/auth/tls.go +++ b/pkg/kafka/auth/tls.go @@ -5,17 +5,19 @@ package auth import ( "context" - "fmt" + "errors" "github.com/Shopify/sarama" "go.opentelemetry.io/collector/config/configtls" ) +var ErrLoadingTLSConfig = errors.New("error loading tls config") + func setTLSConfiguration(ctx context.Context, config *configtls.ClientConfig, saramaConfig *sarama.Config) error { if !config.Insecure { tlsConfig, err := config.LoadTLSConfig(ctx) if err != nil { - return fmt.Errorf("error loading tls config: %w", err) + return errors.Join(ErrLoadingTLSConfig, err) } saramaConfig.Net.TLS.Enable = true saramaConfig.Net.TLS.Config = tlsConfig diff --git a/pkg/kafka/consumer/config_test.go b/pkg/kafka/consumer/config_test.go index 1228db1303f..a174de3c507 100644 --- a/pkg/kafka/consumer/config_test.go +++ b/pkg/kafka/consumer/config_test.go @@ -15,5 +15,5 @@ import ( func TestSetConfiguration(t *testing.T) { test := &Configuration{AuthenticationConfig: auth.AuthenticationConfig{Authentication: "fail"}} _, err := test.NewConsumer(context.Background()) - require.EqualError(t, err, "Unknown/Unsupported authentication method fail to kafka cluster") + require.ErrorIs(t, err, auth.ErrUnsupportedAuthMethod) } diff --git a/pkg/kafka/producer/config_test.go b/pkg/kafka/producer/config_test.go index acd6245b704..3c8141e6ce3 100644 --- a/pkg/kafka/producer/config_test.go +++ b/pkg/kafka/producer/config_test.go @@ -15,5 +15,5 @@ import ( func TestSetConfiguration(t *testing.T) { test := &Configuration{AuthenticationConfig: auth.AuthenticationConfig{Authentication: "fail"}} _, err := test.NewProducer(context.Background()) - require.EqualError(t, err, "Unknown/Unsupported authentication method fail to kafka cluster") + require.ErrorIs(t, err, auth.ErrUnsupportedAuthMethod) } From 46b5890499e12a7b3923b0c4edbb6c1944037586 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sat, 21 Sep 2024 00:38:29 -0400 Subject: [PATCH 3/4] Add More Groupings For Kafka Producer Config Signed-off-by: Mahad Zaryab --- cmd/jaeger/config-kafka-collector.yaml | 5 +- cmd/jaeger/config-kafka-ingester.yaml | 5 +- pkg/kafka/producer/config.go | 67 ++++++++++++++++++-------- pkg/kafka/producer/config_test.go | 2 +- plugin/storage/kafka/factory_test.go | 4 +- plugin/storage/kafka/options.go | 28 ++++++----- plugin/storage/kafka/options_test.go | 36 +++++++------- 7 files changed, 92 insertions(+), 55 deletions(-) diff --git a/cmd/jaeger/config-kafka-collector.yaml b/cmd/jaeger/config-kafka-collector.yaml index ac438f30022..248baddaef0 100644 --- a/cmd/jaeger/config-kafka-collector.yaml +++ b/cmd/jaeger/config-kafka-collector.yaml @@ -33,7 +33,8 @@ processors: exporters: kafka: - brokers: - - localhost:9092 + broker: + addresses: + - localhost:9092 topic: "jaeger-spans" encoding: otlp_proto diff --git a/cmd/jaeger/config-kafka-ingester.yaml b/cmd/jaeger/config-kafka-ingester.yaml index 9adfc308a9c..f0c40f88b4d 100644 --- a/cmd/jaeger/config-kafka-ingester.yaml +++ b/cmd/jaeger/config-kafka-ingester.yaml @@ -29,8 +29,9 @@ extensions: receivers: kafka: - brokers: - - localhost:9092 + broker: + addresses: + - localhost:9092 topic: "jaeger-spans" encoding: otlp_proto initial_offset: earliest diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 860c16e98cb..2e0d95a0325 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -19,30 +19,57 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { - Brokers []string `mapstructure:"brokers"` - RequiredAcks sarama.RequiredAcks `mapstructure:"required_acks"` - Compression sarama.CompressionCodec `mapstructure:"compression"` - CompressionLevel int `mapstructure:"compression_level"` - ProtocolVersion string `mapstructure:"protocol_version"` - BatchLinger time.Duration `mapstructure:"batch_linger"` - BatchSize int `mapstructure:"batch_size"` - BatchMinMessages int `mapstructure:"batch_min_messages"` - BatchMaxMessages int `mapstructure:"batch_max_messages"` - MaxMessageBytes int `mapstructure:"max_message_bytes"` - auth.AuthenticationConfig `mapstructure:"authentication"` + Authentication auth.AuthenticationConfig `mapstructure:"auth"` + Batch Batch `mapstructure:"batch"` + Broker Broker `mapstructure:"broker"` + Compression Compression `mapstructure:"compression"` + MaxMessageBytes int `mapstructure:"max_message_bytes"` + ProtocolVersion string `mapstructure:"protocol_version"` +} + +type Batch struct { + // Linger is the time interval to wait before sending records to the Kafka broker. + // A higher value will reduce the number requests to Kafka but increase latency and the possibility + // of data loss in case of process restart (see https://kafka.apache.org/documentation/"). + Linger time.Duration `mapstructure:"linger"` + // MaxMessages is the maximum number of message to batch before sending records to Kafka. + MaxMessages int `mapstructure:"max_messages"` + // MinMessages is the best-effort number of messages needed to send a batch of records to Kafka. + // A higher value will reduce the number requests to Kafka but increase latency and the possibility + // of data loss in case of process restart (see https://kafka.apache.org/documentation/"). + MinMessages int `mapstructure:"min_messages"` + // Size is the best-effort number of bytes needed to send a batch of records to Kafka. + // A higher value will reduce the number requests to Kafka but increase latency and the possibility + // of data loss in case of process restart (see https://kafka.apache.org/documentation/"). + Size int `mapstructure:"size"` +} + +type Broker struct { + // Addresses contains a list of the broker addresses. + Addresses []string `mapstructure:"addresses"` + // RequiredAcks tells the level of acknowledgement reliability needed from the + // broker when producing requests. + RequiredAcks sarama.RequiredAcks `mapstructure:"required_acks"` +} + +type Compression struct { + // Level contains the level of compression to use for messages. + Level int `mapstructure:"level"` + // Type contains the type of compression to use for messages. + Type sarama.CompressionCodec `mapstructure:"type"` } // NewProducer creates a new asynchronous kafka producer func (c *Configuration) NewProducer(ctx context.Context) (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() - saramaConfig.Producer.RequiredAcks = c.RequiredAcks - saramaConfig.Producer.Compression = c.Compression - saramaConfig.Producer.CompressionLevel = c.CompressionLevel + saramaConfig.Producer.RequiredAcks = c.Broker.RequiredAcks + saramaConfig.Producer.Compression = c.Compression.Type + saramaConfig.Producer.CompressionLevel = c.Compression.Level saramaConfig.Producer.Return.Successes = true - saramaConfig.Producer.Flush.Bytes = c.BatchSize - saramaConfig.Producer.Flush.Frequency = c.BatchLinger - saramaConfig.Producer.Flush.Messages = c.BatchMinMessages - saramaConfig.Producer.Flush.MaxMessages = c.BatchMaxMessages + saramaConfig.Producer.Flush.Bytes = c.Batch.Size + saramaConfig.Producer.Flush.Frequency = c.Batch.Linger + saramaConfig.Producer.Flush.Messages = c.Batch.MinMessages + saramaConfig.Producer.Flush.MaxMessages = c.Batch.MaxMessages saramaConfig.Producer.MaxMessageBytes = c.MaxMessageBytes if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) @@ -51,8 +78,8 @@ func (c *Configuration) NewProducer(ctx context.Context) (sarama.AsyncProducer, } saramaConfig.Version = ver } - if err := c.AuthenticationConfig.SetConfiguration(ctx, saramaConfig); err != nil { + if err := c.Authentication.SetConfiguration(ctx, saramaConfig); err != nil { return nil, err } - return sarama.NewAsyncProducer(c.Brokers, saramaConfig) + return sarama.NewAsyncProducer(c.Broker.Addresses, saramaConfig) } diff --git a/pkg/kafka/producer/config_test.go b/pkg/kafka/producer/config_test.go index 3c8141e6ce3..01d22a1fd83 100644 --- a/pkg/kafka/producer/config_test.go +++ b/pkg/kafka/producer/config_test.go @@ -13,7 +13,7 @@ import ( ) func TestSetConfiguration(t *testing.T) { - test := &Configuration{AuthenticationConfig: auth.AuthenticationConfig{Authentication: "fail"}} + test := &Configuration{Authentication: auth.AuthenticationConfig{Authentication: "fail"}} _, err := test.NewProducer(context.Background()) require.ErrorIs(t, err, auth.ErrUnsupportedAuthMethod) } diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index 27e1bd35d6b..a82bb8aeae8 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -150,7 +150,9 @@ func TestKafkaFactoryDoesNotLogPassword(t *testing.T) { func TestConfigureFromOptions(t *testing.T) { f := NewFactory() - o := Options{Topic: "testTopic", Config: kafkaConfig.Configuration{Brokers: []string{"host"}}} + o := Options{Topic: "testTopic", Config: kafkaConfig.Configuration{ + Broker: kafkaConfig.Broker{Addresses: []string{"host"}}, + }} f.configureFromOptions(o) assert.Equal(t, o, f.options) assert.Equal(t, &o.Config, f.Builder) diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index efa7e8b904a..ced7c772edc 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -193,17 +193,23 @@ func (opt *Options) InitFromViper(v *viper.Viper) { } opt.Config = producer.Configuration{ - Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), - RequiredAcks: requiredAcks, - Compression: compressionModeCodec, - CompressionLevel: compressionLevel, - ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), - AuthenticationConfig: authenticationOptions, - BatchLinger: v.GetDuration(configPrefix + suffixBatchLinger), - BatchSize: v.GetInt(configPrefix + suffixBatchSize), - BatchMinMessages: v.GetInt(configPrefix + suffixBatchMinMessages), - BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages), - MaxMessageBytes: v.GetInt(configPrefix + suffixMaxMessageBytes), + Broker: producer.Broker{ + Addresses: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), + RequiredAcks: requiredAcks, + }, + Compression: producer.Compression{ + Type: compressionModeCodec, + Level: compressionLevel, + }, + ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), + Authentication: authenticationOptions, + Batch: producer.Batch{ + Linger: v.GetDuration(configPrefix + suffixBatchLinger), + Size: v.GetInt(configPrefix + suffixBatchSize), + MinMessages: v.GetInt(configPrefix + suffixBatchMinMessages), + MaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages), + }, + MaxMessageBytes: v.GetInt(configPrefix + suffixMaxMessageBytes), } opt.Topic = v.GetString(configPrefix + suffixTopic) opt.Encoding = v.GetString(configPrefix + suffixEncoding) diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 28c83c1765c..901af3a34e2 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -36,16 +36,16 @@ func TestOptionsWithFlags(t *testing.T) { opts.InitFromViper(v) assert.Equal(t, "topic1", opts.Topic) - assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Config.Brokers) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Config.Broker.Addresses) assert.Equal(t, "protobuf", opts.Encoding) - assert.Equal(t, sarama.WaitForLocal, opts.Config.RequiredAcks) - assert.Equal(t, sarama.CompressionGZIP, opts.Config.Compression) - assert.Equal(t, 7, opts.Config.CompressionLevel) - assert.Equal(t, 128000, opts.Config.BatchSize) - assert.Equal(t, time.Duration(1*time.Second), opts.Config.BatchLinger) - assert.Equal(t, 50, opts.Config.BatchMinMessages) - assert.Equal(t, 100, opts.Config.BatchMaxMessages) - assert.Equal(t, 100, opts.Config.BatchMaxMessages) + assert.Equal(t, sarama.WaitForLocal, opts.Config.Broker.RequiredAcks) + assert.Equal(t, sarama.CompressionGZIP, opts.Config.Compression.Type) + assert.Equal(t, 7, opts.Config.Compression.Level) + assert.Equal(t, 128000, opts.Config.Batch.Size) + assert.Equal(t, time.Duration(1*time.Second), opts.Config.Batch.Linger) + assert.Equal(t, 50, opts.Config.Batch.MinMessages) + assert.Equal(t, 100, opts.Config.Batch.MaxMessages) + assert.Equal(t, 100, opts.Config.Batch.MaxMessages) assert.Equal(t, 10485760, opts.Config.MaxMessageBytes) } @@ -56,15 +56,15 @@ func TestFlagDefaults(t *testing.T) { opts.InitFromViper(v) assert.Equal(t, defaultTopic, opts.Topic) - assert.Equal(t, []string{defaultBroker}, opts.Config.Brokers) + assert.Equal(t, []string{defaultBroker}, opts.Config.Broker.Addresses) assert.Equal(t, defaultEncoding, opts.Encoding) - assert.Equal(t, sarama.WaitForLocal, opts.Config.RequiredAcks) - assert.Equal(t, sarama.CompressionNone, opts.Config.Compression) - assert.Equal(t, 0, opts.Config.CompressionLevel) - assert.Equal(t, 0, opts.Config.BatchSize) - assert.Equal(t, time.Duration(0*time.Second), opts.Config.BatchLinger) - assert.Equal(t, 0, opts.Config.BatchMinMessages) - assert.Equal(t, 0, opts.Config.BatchMaxMessages) + assert.Equal(t, sarama.WaitForLocal, opts.Config.Broker.RequiredAcks) + assert.Equal(t, sarama.CompressionNone, opts.Config.Compression.Type) + assert.Equal(t, 0, opts.Config.Compression.Level) + assert.Equal(t, 0, opts.Config.Batch.Size) + assert.Equal(t, time.Duration(0*time.Second), opts.Config.Batch.Linger) + assert.Equal(t, 0, opts.Config.Batch.MinMessages) + assert.Equal(t, 0, opts.Config.Batch.MaxMessages) assert.Equal(t, defaultMaxMessageBytes, opts.Config.MaxMessageBytes) } @@ -200,7 +200,7 @@ func TestTLSFlags(t *testing.T) { err := command.ParseFlags(test.flags) require.NoError(t, err) o.InitFromViper(v) - assert.Equal(t, test.expected, o.Config.AuthenticationConfig) + assert.Equal(t, test.expected, o.Config.Authentication) }) } } From 759bb423925788ecd64bd2ad91415bd98b6a3ecf Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sat, 21 Sep 2024 12:28:05 -0400 Subject: [PATCH 4/4] Revert Changes To Kafka E2E Config Signed-off-by: Mahad Zaryab --- cmd/jaeger/config-kafka-collector.yaml | 5 ++--- cmd/jaeger/config-kafka-ingester.yaml | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/cmd/jaeger/config-kafka-collector.yaml b/cmd/jaeger/config-kafka-collector.yaml index 248baddaef0..ac438f30022 100644 --- a/cmd/jaeger/config-kafka-collector.yaml +++ b/cmd/jaeger/config-kafka-collector.yaml @@ -33,8 +33,7 @@ processors: exporters: kafka: - broker: - addresses: - - localhost:9092 + brokers: + - localhost:9092 topic: "jaeger-spans" encoding: otlp_proto diff --git a/cmd/jaeger/config-kafka-ingester.yaml b/cmd/jaeger/config-kafka-ingester.yaml index f0c40f88b4d..9adfc308a9c 100644 --- a/cmd/jaeger/config-kafka-ingester.yaml +++ b/cmd/jaeger/config-kafka-ingester.yaml @@ -29,9 +29,8 @@ extensions: receivers: kafka: - broker: - addresses: - - localhost:9092 + brokers: + - localhost:9092 topic: "jaeger-spans" encoding: otlp_proto initial_offset: earliest