Skip to content

Commit

Permalink
Feature: Add SASL authentication support (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
victorlcm authored Aug 26, 2021
1 parent 17c4400 commit 5dcf974
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 39 deletions.
10 changes: 2 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@ module github.com/k6io/xk6-output-kafka
go 1.16

require (
github.com/Shopify/sarama v1.16.0
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20160609142408-bb955e01b934 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/Shopify/sarama v1.29.1
github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
github.com/kelseyhightower/envconfig v1.4.0
github.com/kubernetes/helm v2.9.0+incompatible
github.com/mitchellh/mapstructure v1.1.2
github.com/pierrec/lz4 v1.0.2-0.20171218195038-2fcda4cb7018 // indirect
github.com/pierrec/xxHash v0.1.5 // indirect
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 // indirect
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/xdg/scram v1.0.3
go.k6.io/k6 v0.33.1-0.20210701133132-aa1fd6a389c9
gopkg.in/guregu/null.v3 v3.3.0
)
67 changes: 51 additions & 16 deletions go.sum

Large diffs are not rendered by default.

56 changes: 50 additions & 6 deletions pkg/kafka/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package kafka

import (
"crypto/tls"
"encoding/json"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -51,7 +53,48 @@ func New(p output.Params) (*Collector, error) {
if err != nil {
return nil, err
}
producer, err := sarama.NewSyncProducer(conf.Brokers, nil)

saramaConfig := sarama.NewConfig()

if conf.AuthMechanism.String != "none" {
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.Handshake = true
saramaConfig.Net.SASL.User = conf.User.String
saramaConfig.Net.SASL.Password = conf.Password.String

if conf.SSL {
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: conf.Insecure,
}

}

switch conf.AuthMechanism.String {
case "plain":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
case "scram-sha-512":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &xDGSCRAMClient{HashGeneratorFcn: SHA512} }
case "scram-sha-256":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &xDGSCRAMClient{HashGeneratorFcn: SHA256} }
default:
return nil, errors.New("invalid auth mechanism for kafka SASL")
}
}

saramaConfig.Producer.Return.Successes = true
version, err := sarama.ParseKafkaVersion(conf.Version.String)

if err != nil {
return nil, err
}

saramaConfig.Version = version

producer, err := sarama.NewSyncProducer(conf.Brokers, saramaConfig)

if err != nil {
return nil, err
}
Expand All @@ -71,24 +114,25 @@ func (c *Collector) Description() string {
func (c *Collector) Stop() error {
c.done <- struct{}{}
<-c.done

err := c.Producer.Close()
if err != nil {
c.logger.WithError(err).Error("Kafka: Failed to close producer.")
}
return nil
}

func (c *Collector) Start() error {
c.logger.Debug("Kafka: starting!")
go func() {
ticker := time.NewTicker(time.Duration(c.Config.PushInterval.Duration))
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.pushMetrics()
case <-c.done:
c.pushMetrics()

err := c.Producer.Close()
if err != nil {
c.logger.WithError(err).Error("Kafka: Failed to close producer.")
}
close(c.done)
return
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/kafka/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package kafka
import (
"encoding/json"
"fmt"
"os"
"testing"

"github.com/Shopify/sarama"
Expand All @@ -40,12 +41,14 @@ func TestRun(t *testing.T) {
coordinator := sarama.NewMockBroker(t, 2)
seedMeta := new(sarama.MetadataResponse)
seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, sarama.ErrNoError)
seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, []int32{}, sarama.ErrNoError)
broker.Returns(seedMeta)

os.Clearenv()

c, err := New(output.Params{
Logger: testutils.NewLogger(t),
JSONConfig: json.RawMessage(fmt.Sprintf(`{"brokers":[%q], "topic": "my_topic"}`, broker.Addr())),
JSONConfig: json.RawMessage(fmt.Sprintf(`{"brokers":[%q], "topic": "my_topic", "version": "0.8.2.0"}`, broker.Addr())),
})

require.NoError(t, err)
Expand Down
81 changes: 74 additions & 7 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ package kafka

import (
"encoding/json"
"errors"
"time"

"github.com/Shopify/sarama"
"github.com/kelseyhightower/envconfig"
"github.com/kubernetes/helm/pkg/strvals"
"github.com/mitchellh/mapstructure"
Expand All @@ -38,22 +40,34 @@ type Config struct {
Brokers []string `json:"brokers" envconfig:"K6_KAFKA_BROKERS"`

// Samples.
Topic null.String `json:"topic" envconfig:"K6_KAFKA_TOPIC"`
Format null.String `json:"format" envconfig:"K6_KAFKA_FORMAT"`
PushInterval types.NullDuration `json:"push_interval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`
Topic null.String `json:"topic" envconfig:"K6_KAFKA_TOPIC"`
User null.String `json:"user" envconfig:"K6_KAFKA_SASL_USER"`
Password null.String `json:"password" envconfig:"K6_KAFKA_SASL_PASSWORD"`
AuthMechanism null.String `json:"auth_mechanism" envconfig:"K6_KAFKA_AUTH_MECHANISM"`
Format null.String `json:"format" envconfig:"K6_KAFKA_FORMAT"`
PushInterval types.NullDuration `json:"push_interval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`
Version null.String `json:"version" envconfig:"K6_KAFKA_VERSION"`
SSL bool `json:"ssl" envconfig:"K6_KAFKA_SSL"`
Insecure bool `json:"insecure" envconfig:"K6_KAFKA_INSECURE"`

InfluxDBConfig influxdbConfig `json:"influxdb"`
}

// config is a duplicate of ConfigFields as we can not mapstructure.Decode into
// null types so we duplicate the struct with primitive types to Decode into
type config struct {
Brokers []string `json:"brokers" mapstructure:"brokers" envconfig:"K6_KAFKA_BROKERS"`
Topic string `json:"topic" mapstructure:"topic" envconfig:"K6_KAFKA_TOPIC"`
Format string `json:"format" mapstructure:"format" envconfig:"K6_KAFKA_FORMAT"`
PushInterval string `json:"push_interval" mapstructure:"push_interval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`
Brokers []string `json:"brokers" mapstructure:"brokers" envconfig:"K6_KAFKA_BROKERS"`
Topic string `json:"topic" mapstructure:"topic" envconfig:"K6_KAFKA_TOPIC"`
Format string `json:"format" mapstructure:"format" envconfig:"K6_KAFKA_FORMAT"`
PushInterval string `json:"push_interval" mapstructure:"push_interval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`
User string `json:"user" mapstructure:"user" envconfig:"K6_KAFKA_SASL_USER"`
Password string `json:"password" mapstructure:"password" envconfig:"K6_KAFKA_SASL_PASSWORD"`
AuthMechanism string `json:"auth_mechanism" mapstructure:"auth_mechanism" envconfig:"K6_KAFKA_AUTH_MECHANISM"`

InfluxDBConfig influxdbConfig `json:"influxdb" mapstructure:"influxdb"`
Version string `json:"version" mapstructure:"version"`
SSL bool `json:"ssl" mapstructure:"ssl"`
Insecure bool `json:"insecure" mapstructure:"insecure"`
}

// NewConfig creates a new Config instance with default values for some fields.
Expand All @@ -62,6 +76,10 @@ func NewConfig() Config {
Format: null.StringFrom("json"),
PushInterval: types.NullDurationFrom(1 * time.Second),
InfluxDBConfig: newInfluxdbConfig(),
AuthMechanism: null.StringFrom("none"),
Version: null.StringFrom(sarama.DefaultVersion.String()),
SSL: false,
Insecure: false,
}
}

Expand All @@ -78,6 +96,26 @@ func (c Config) Apply(cfg Config) Config {
if cfg.PushInterval.Valid {
c.PushInterval = cfg.PushInterval
}
if cfg.AuthMechanism.Valid {
c.AuthMechanism = cfg.AuthMechanism
}
if cfg.User.Valid {
c.User = cfg.User
}
if cfg.Password.Valid {
c.Password = cfg.Password
}
if cfg.Version.Valid {
c.Version = cfg.Version
}
if cfg.SSL {
c.SSL = cfg.SSL
}

if c.Insecure {
c.Insecure = cfg.Insecure
}

c.InfluxDBConfig = c.InfluxDBConfig.Apply(cfg.InfluxDBConfig)
return c
}
Expand Down Expand Up @@ -111,6 +149,30 @@ func ParseArg(arg string) (Config, error) {
}
}

if v, ok := params["version"].(string); ok {
c.Version = null.StringFrom(v)
}

if v, ok := params["ssl"].(bool); ok {
c.SSL = v
}

if v, ok := params["skip_cert_verify"].(bool); ok {
c.Insecure = v
}

if v, ok := params["auth_mechanism"].(string); ok {
c.AuthMechanism = null.StringFrom(v)
}

if v, ok := params["user"].(string); ok {
c.User = null.StringFrom(v)
}

if v, ok := params["password"].(string); ok {
c.Password = null.StringFrom(v)
}

var cfg config
err = mapstructure.Decode(params, &cfg)
if err != nil {
Expand Down Expand Up @@ -141,8 +203,13 @@ func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, a
// TODO: get rid of envconfig and actually use the env parameter...
return result, err
}

result = result.Apply(envConfig)

if result.AuthMechanism.String != "none" && (!result.User.Valid || !result.Password.Valid) {
return result, errors.New("user and password are required when auth mechanism is provided")
}

if arg != "" {
urlConf, err := ParseArg(arg)
if err != nil {
Expand Down
93 changes: 93 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"
Expand Down Expand Up @@ -67,6 +68,36 @@ func TestConfigParseArg(t *testing.T) {
assert.Equal(t, null.StringFrom("someTopic"), c.Topic)
assert.Equal(t, null.StringFrom("influxdb"), c.Format)
assert.Equal(t, expInfluxConfig, c.InfluxDBConfig)

c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic,format=json,auth_mechanism=SASL_PLAINTEXT,user=johndoe,password=123password")
assert.Nil(t, err)
assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers)
assert.Equal(t, null.StringFrom("someTopic"), c.Topic)
assert.Equal(t, null.StringFrom("json"), c.Format)
assert.Equal(t, null.StringFrom("SASL_PLAINTEXT"), c.AuthMechanism)
assert.Equal(t, null.StringFrom("johndoe"), c.User)
assert.Equal(t, null.StringFrom("123password"), c.Password)
assert.Equal(t, false, c.SSL)

c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic,format=json,auth_mechanism=SASL_PLAINTEXT,user=johndoe,password=123password,ssl=false")
assert.Nil(t, err)
assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers)
assert.Equal(t, null.StringFrom("someTopic"), c.Topic)
assert.Equal(t, null.StringFrom("json"), c.Format)
assert.Equal(t, null.StringFrom("SASL_PLAINTEXT"), c.AuthMechanism)
assert.Equal(t, null.StringFrom("johndoe"), c.User)
assert.Equal(t, null.StringFrom("123password"), c.Password)
assert.Equal(t, false, c.SSL)

c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic,format=json,auth_mechanism=SASL_PLAINTEXT,user=johndoe,password=123password,ssl=true")
assert.Nil(t, err)
assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers)
assert.Equal(t, null.StringFrom("someTopic"), c.Topic)
assert.Equal(t, null.StringFrom("json"), c.Format)
assert.Equal(t, null.StringFrom("SASL_PLAINTEXT"), c.AuthMechanism)
assert.Equal(t, null.StringFrom("johndoe"), c.User)
assert.Equal(t, null.StringFrom("123password"), c.Password)
assert.Equal(t, true, c.SSL)
}

func TestConsolidatedConfig(t *testing.T) {
Expand All @@ -80,11 +111,73 @@ func TestConsolidatedConfig(t *testing.T) {
err string
}{
"default": {
env: map[string]string{
"K6_KAFKA_AUTH_MECHANISM": "none",
},
config: Config{
Format: null.StringFrom("json"),
PushInterval: types.NullDurationFrom(1 * time.Second),
InfluxDBConfig: newInfluxdbConfig(),
AuthMechanism: null.StringFrom("none"),
Version: null.StringFrom(sarama.DefaultVersion.String()),
},
},
"auth": {
env: map[string]string{
"K6_KAFKA_AUTH_MECHANISM": "scram-sha-512",
"K6_KAFKA_SASL_PASSWORD": "password123",
"K6_KAFKA_SASL_USER": "testuser",
},
config: Config{
Format: null.StringFrom("json"),
PushInterval: types.NullDurationFrom(1 * time.Second),
InfluxDBConfig: newInfluxdbConfig(),
AuthMechanism: null.StringFrom("scram-sha-512"),
Password: null.StringFrom("password123"),
User: null.StringFrom("testuser"),
Version: null.StringFrom(sarama.DefaultVersion.String()),
},
},
"auth-missing-credentials": {
env: map[string]string{
"K6_KAFKA_AUTH_MECHANISM": "scram-sha-512",
},
config: Config{
Format: null.StringFrom("json"),
PushInterval: types.NullDurationFrom(1 * time.Second),
InfluxDBConfig: newInfluxdbConfig(),
AuthMechanism: null.StringFrom("scram-sha-512"),
Version: null.StringFrom(sarama.DefaultVersion.String()),
},
err: "user and password are required when auth mechanism is provided",
},
"auth-missing-user": {
env: map[string]string{
"K6_KAFKA_AUTH_MECHANISM": "scram-sha-512",
"K6_KAFKA_SASL_PASSWORD": "password123",
},
config: Config{
Format: null.StringFrom("json"),
PushInterval: types.NullDurationFrom(1 * time.Second),
InfluxDBConfig: newInfluxdbConfig(),
AuthMechanism: null.StringFrom("scram-sha-512"),
Version: null.StringFrom(sarama.DefaultVersion.String()),
},
err: "user and password are required when auth mechanism is provided",
},
"auth-missing-password": {
env: map[string]string{
"K6_KAFKA_AUTH_MECHANISM": "scram-sha-512",
"K6_KAFKA_SASL_USER": "testuser",
},
config: Config{
Format: null.StringFrom("json"),
PushInterval: types.NullDurationFrom(1 * time.Second),
InfluxDBConfig: newInfluxdbConfig(),
AuthMechanism: null.StringFrom("scram-sha-512"),
Version: null.StringFrom(sarama.DefaultVersion.String()),
},
err: "user and password are required when auth mechanism is provided",
},
}

Expand Down
Loading

0 comments on commit 5dcf974

Please sign in to comment.