Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Align Kafka Storage Config With OTEL #6003

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package builder

import (
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -50,7 +51,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
RackID: options.RackID,
FetchMaxMessageBytes: options.FetchMaxMessageBytes,
}
saramaConsumer, err := consumerConfig.NewConsumer(logger)
saramaConsumer, err := consumerConfig.NewConsumer(context.Background())
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/kafka/auth"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
Expand Down Expand Up @@ -56,23 +56,23 @@ func TestTLSFlags(t *testing.T) {
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: true}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: true}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{Insecure: false}, PlainText: plain},
},
}

Expand Down
3 changes: 0 additions & 3 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ func main() {
consumer.Start()

svc.RunAndThen(func() {
if err := options.TLS.Close(); err != nil {
logger.Error("Failed to close TLS certificates watcher", zap.Error(err))
}
if err = consumer.Close(); err != nil {
logger.Error("Failed to close consumer", zap.Error(err))
}
Expand Down
27 changes: 16 additions & 11 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
package auth

import (
"context"
"errors"
"fmt"
"strings"

"github.com/Shopify/sarama"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
)
Expand All @@ -28,22 +30,24 @@ var authTypes = []string{
plaintext,
}

var ErrUnsupportedAuthMethod = errors.New("unknown or unsupported authentication method to kafka cluster")

// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster
type AuthenticationConfig struct {
Authentication string `mapstructure:"type"`
Kerberos KerberosConfig `mapstructure:"kerberos"`
TLS tlscfg.Options `mapstructure:"tls"`
PlainText PlainTextConfig `mapstructure:"plaintext"`
Authentication string `mapstructure:"type"`
Kerberos KerberosConfig `mapstructure:"kerberos"`
TLS configtls.ClientConfig `mapstructure:"tls"`
PlainText PlainTextConfig `mapstructure:"plaintext"`
}

// SetConfiguration set configure authentication into sarama config structure
func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config, logger *zap.Logger) error {
func (config *AuthenticationConfig) SetConfiguration(ctx context.Context, saramaConfig *sarama.Config) error {
authentication := strings.ToLower(config.Authentication)
if strings.Trim(authentication, " ") == "" {
authentication = none
}
if config.Authentication == tls || config.TLS.Enabled {
err := setTLSConfiguration(&config.TLS, saramaConfig, logger)
if config.Authentication == tls || !config.TLS.Insecure {
err := setTLSConfiguration(ctx, &config.TLS, saramaConfig)
if err != nil {
return err
}
Expand All @@ -59,7 +63,7 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config
case plaintext:
return setPlainTextConfiguration(&config.PlainText, saramaConfig)
default:
return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication)
return fmt.Errorf("%w [type=%s]", ErrUnsupportedAuthMethod, config.Authentication)
}
}

Expand All @@ -80,13 +84,14 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.
}

var err error
config.TLS, err = tlsClientConfig.InitFromViper(v)
tlsCfg, err := tlsClientConfig.InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to process Kafka TLS options: %w", err)
}
if config.Authentication == tls {
config.TLS.Enabled = true
tlsCfg.Enabled = true
}
config.TLS = tlsCfg.ToOtelClientConfig()

config.PlainText.Username = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUsername)
config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword)
Expand Down
49 changes: 20 additions & 29 deletions pkg/kafka/auth/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
package auth

import (
"context"
"flag"
"testing"

"github.com/Shopify/sarama"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
)

func addFlags(flags *flag.FlagSet) {
Expand Down Expand Up @@ -64,8 +63,8 @@ func Test_InitFromViper(t *testing.T) {
KeyTabPath: "/path/to/keytab",
DisablePAFXFast: true,
},
TLS: tlscfg.Options{
Enabled: true,
TLS: configtls.ClientConfig{
Insecure: false,
},
PlainText: PlainTextConfig{
Username: "user",
Expand All @@ -77,16 +76,15 @@ func Test_InitFromViper(t *testing.T) {
}

// Test plaintext with different mechanisms
func testPlaintext(v *viper.Viper, t *testing.T, configPrefix string, logger *zap.Logger, mechanism string, saramaConfig *sarama.Config) {
func testPlaintext(ctx context.Context, v *viper.Viper, t *testing.T, configPrefix string, mechanism string, saramaConfig *sarama.Config) {
v.Set(configPrefix+plainTextPrefix+suffixPlainTextMechanism, mechanism)
authConfig := &AuthenticationConfig{}
err := authConfig.InitFromViper(configPrefix, v)
require.NoError(t, err)
require.NoError(t, authConfig.SetConfiguration(saramaConfig, logger))
require.NoError(t, authConfig.SetConfiguration(ctx, saramaConfig))
}

func TestSetConfiguration(t *testing.T) {
logger := zaptest.NewLogger(t)
saramaConfig := sarama.NewConfig()
configPrefix := "kafka.auth"
v, command := config.Viperize(addFlags)
Expand All @@ -95,51 +93,45 @@ func TestSetConfiguration(t *testing.T) {
tests := []struct {
name string
authType string
expectedError string
expectedError error
plainTextMechanisms []string
}{
{
name: "Invalid authentication method",
authType: "fail",
expectedError: "Unknown/Unsupported authentication method fail to kafka cluster",
expectedError: ErrUnsupportedAuthMethod,
},
{
name: "Kerberos authentication",
authType: "kerberos",
expectedError: "",
name: "Kerberos authentication",
authType: "kerberos",
},
{
name: "Plaintext authentication with SCRAM-SHA-256",
authType: "plaintext",
expectedError: "",
plainTextMechanisms: []string{"SCRAM-SHA-256"},
},
{
name: "Plaintext authentication with SCRAM-SHA-512",
authType: "plaintext",
expectedError: "",
plainTextMechanisms: []string{"SCRAM-SHA-512"},
},
{
name: "Plaintext authentication with PLAIN",
authType: "plaintext",
expectedError: "",
plainTextMechanisms: []string{"PLAIN"},
},
{
name: "No authentication",
authType: " ",
expectedError: "",
name: "No authentication",
authType: " ",
},
{
name: "TLS authentication",
authType: "tls",
expectedError: "",
name: "TLS authentication",
authType: "tls",
},
{
name: "TLS authentication with invalid cipher suite",
authType: "tls",
expectedError: "error loading tls config: failed to get cipher suite ids from cipher suite names: cipher suite fail not supported or doesn't exist",
expectedError: ErrLoadingTLSConfig,
},
}

Expand All @@ -149,22 +141,21 @@ func TestSetConfiguration(t *testing.T) {
"--kafka.auth.authentication=" + tt.authType,
})
authConfig := &AuthenticationConfig{}
defer authConfig.TLS.Close()
err := authConfig.InitFromViper(configPrefix, v)
require.NoError(t, err)

if tt.authType == "tls" && tt.expectedError != "" {
if tt.authType == "tls" && tt.expectedError != nil {
authConfig.TLS.CipherSuites = []string{"fail"}
}

if len(tt.plainTextMechanisms) > 0 {
for _, mechanism := range tt.plainTextMechanisms {
testPlaintext(v, t, configPrefix, logger, mechanism, saramaConfig)
testPlaintext(context.Background(), v, t, configPrefix, mechanism, saramaConfig)
}
} else {
err = authConfig.SetConfiguration(saramaConfig, logger)
if tt.expectedError != "" {
require.EqualError(t, err, tt.expectedError)
err = authConfig.SetConfiguration(context.Background(), saramaConfig)
if tt.expectedError != nil {
require.ErrorIs(t, err, tt.expectedError)
} else {
require.NoError(t, err)
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/kafka/auth/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@
package auth

import (
"fmt"
"context"
"errors"

"github.com/Shopify/sarama"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"go.opentelemetry.io/collector/config/configtls"
)

func setTLSConfiguration(config *tlscfg.Options, saramaConfig *sarama.Config, logger *zap.Logger) error {
if config.Enabled {
tlsConfig, err := config.Config(logger)
var ErrLoadingTLSConfig = errors.New("error loading tls config")

func setTLSConfiguration(ctx context.Context, config *configtls.ClientConfig, saramaConfig *sarama.Config) error {
if !config.Insecure {
tlsConfig, err := config.LoadTLSConfig(ctx)
if err != nil {
return fmt.Errorf("error loading tls config: %w", err)
return errors.Join(ErrLoadingTLSConfig, err)
}
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = tlsConfig
Expand Down
13 changes: 5 additions & 8 deletions pkg/kafka/auth/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,22 @@
package auth

import (
"context"
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"go.opentelemetry.io/collector/config/configtls"
)

func TestSetTLSConfiguration(t *testing.T) {
logger := zaptest.NewLogger(t)
saramaConfig := sarama.NewConfig()
tlsConfig := &tlscfg.Options{
Enabled: true,
tlsConfig := &configtls.ClientConfig{
Insecure: false,
}
err := setTLSConfiguration(tlsConfig, saramaConfig, logger)
err := setTLSConfiguration(context.Background(), tlsConfig, saramaConfig)
require.NoError(t, err)
assert.True(t, saramaConfig.Net.TLS.Enable)
assert.NotNil(t, saramaConfig.Net.TLS.Config)
defer tlsConfig.Close()
}
6 changes: 3 additions & 3 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package consumer

import (
"context"
"io"
"time"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
)
Expand Down Expand Up @@ -42,7 +42,7 @@ type Configuration struct {
}

// NewConsumer creates a new kafka consumer
func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) {
func (c *Configuration) NewConsumer(ctx context.Context) (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
Expand All @@ -55,7 +55,7 @@ func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) {
}
saramaConfig.Config.Version = ver
}
if err := c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config, logger); err != nil {
if err := c.AuthenticationConfig.SetConfiguration(ctx, &saramaConfig.Config); err != nil {
return nil, err
}
// cluster.NewConfig() uses sarama.NewConfig() to create the config.
Expand Down
7 changes: 3 additions & 4 deletions pkg/kafka/consumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@
package consumer

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
)

func TestSetConfiguration(t *testing.T) {
logger := zaptest.NewLogger(t)
test := &Configuration{AuthenticationConfig: auth.AuthenticationConfig{Authentication: "fail"}}
_, err := test.NewConsumer(logger)
require.EqualError(t, err, "Unknown/Unsupported authentication method fail to kafka cluster")
_, err := test.NewConsumer(context.Background())
require.ErrorIs(t, err, auth.ErrUnsupportedAuthMethod)
}
Loading
Loading