Skip to content

Commit

Permalink
feat: use config objects (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhck authored Nov 15, 2022
1 parent 100cf07 commit 56f0d7a
Show file tree
Hide file tree
Showing 16 changed files with 220 additions and 188 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy-docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- run: git config user.name github-actions
- run: git config user.email github-actions@github.com
- run: cd docs && npm ci && npm run deploy
- run: cd docs && npm ci
- uses: crazy-max/ghaction-github-pages@v2
with:
target_branch: gh-pages
Expand Down
75 changes: 75 additions & 0 deletions v2/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package config

import "github.com/confluentinc/confluent-kafka-go/kafka"

type KPConfig struct {
KafkaConfig Kafka
SchemaRegistryConfig SchemaRegistry
}

type Kafka struct {
ConsumerGroupName string
BootstrapServers string
SaslMechanism *string
SecurityProtocol *string
Username *string
Password *string
ConsumerSessionTimeoutMs *int
ConsumerAutoOffsetReset *string
}

func (s Kafka) WithDefaults() Kafka {
return Kafka{
ConsumerGroupName: s.ConsumerGroupName,
BootstrapServers: s.BootstrapServers,
SaslMechanism: s.SaslMechanism,
SecurityProtocol: s.SecurityProtocol,
Username: s.Username,
Password: s.Password,
ConsumerSessionTimeoutMs: defaultIfNil(s.ConsumerSessionTimeoutMs, 6000),
ConsumerAutoOffsetReset: defaultIfNil(s.ConsumerAutoOffsetReset, "earliest"),
}
}

type SchemaRegistry struct {
Endpoint string
Username string
Password string
}

func defaultIfNil[T any](value *T, defaultValue T) *T {
if value == nil {
return &defaultValue
}

return value
}

func GetKafkaConfig(kafkaConfig Kafka) *kafka.ConfigMap {
cfg := &kafka.ConfigMap{}

hydrateIfNotNil(cfg, "bootstrap.servers", &kafkaConfig.BootstrapServers)
hydrateIfNotNil(cfg, "sasl.mechanisms", kafkaConfig.SaslMechanism)
hydrateIfNotNil(cfg, "security.protocol", kafkaConfig.SecurityProtocol)
hydrateIfNotNil(cfg, "sasl.username", kafkaConfig.Username)
hydrateIfNotNil(cfg, "sasl.password", kafkaConfig.Password)

return cfg
}

func GetKafkaConsumerConfig(config Kafka) *kafka.ConfigMap {
cfg := GetKafkaConfig(config)
hydrateIfNotNil(cfg, "group.id", &config.ConsumerGroupName)
hydrateIfNotNil(cfg, "auto.offset.reset", config.ConsumerAutoOffsetReset)
hydrateIfNotNil(cfg, "session.timeout.ms", config.ConsumerSessionTimeoutMs)

return cfg
}

func hydrateIfNotNil[T any](cfg *kafka.ConfigMap, key string, value *T) {
if value == nil {
return
}
// looked at the source code, as of now, there's no error being returned, it's always nil
_ = cfg.SetKey(key, *value)
}
22 changes: 22 additions & 0 deletions v2/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package config_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/honestbank/kp/v2/config"
)

func TestGetKafkaConfig(t *testing.T) {
withDefaults := config.GetKafkaConsumerConfig(config.Kafka{BootstrapServers: "localhost", ConsumerGroupName: "cg"}.WithDefaults())
bootstrapServers, err := withDefaults.Get("bootstrap.servers", "")
assert.NoError(t, err)
assert.Equal(t, "localhost", bootstrapServers)
autoOffsetResets, err := withDefaults.Get("auto.offset.reset", "")
assert.NoError(t, err)
assert.Equal(t, "earliest", autoOffsetResets)
consumerGroupId, err := withDefaults.Get("group.id", "")
assert.NoError(t, err)
assert.Equal(t, "cg", consumerGroupId)
}
1 change: 1 addition & 0 deletions v2/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type KafkaProcessor[MessageType any] interface {
WithRetryOrPanic(retryTopic string, retryCount int) KafkaProcessor[MessageType]
WithDeadletter(deadLetterTopic string) (KafkaProcessor[MessageType], error)
WithDeadletterOrPanic(deadletterTopic string) KafkaProcessor[MessageType]
OnKafkaErrors(cb func(err error))
Stop()
Run(processor Processor[MessageType]) error
}
54 changes: 0 additions & 54 deletions v2/internal/config/config.go

This file was deleted.

44 changes: 0 additions & 44 deletions v2/internal/config/config_test.go

This file was deleted.

11 changes: 3 additions & 8 deletions v2/internal/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package consumer
import (
"github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/honestbank/kp/v2/internal/config"
"github.com/honestbank/kp/v2/config"
)

type consumer struct {
Expand Down Expand Up @@ -35,13 +35,8 @@ func (c consumer) Commit(message *kafka.Message) error {
return err
}

func New(topics []string, consumerGroup string) (Consumer, error) {
cfg, err := config.LoadConfig[config.KafkaConfig]()
if err != nil {
return nil, err
}
kafkaConfig := config.GetKafkaConsumerConfig(*cfg)
_ = kafkaConfig.SetKey("group.id", consumerGroup)
func New(topics []string, cfg config.Kafka) (Consumer, error) {
kafkaConfig := config.GetKafkaConsumerConfig(cfg)
_ = kafkaConfig.SetKey("enable.auto.commit", false)
k, err := kafka.NewConsumer(kafkaConfig)
if err != nil {
Expand Down
46 changes: 35 additions & 11 deletions v2/internal/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"testing"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"

"github.com/honestbank/kp/v2/config"
"github.com/honestbank/kp/v2/internal/consumer"
"github.com/honestbank/kp/v2/producer"
)
Expand All @@ -18,12 +20,17 @@ type MyMsg struct {
}

func TestNew(t *testing.T) {
t.Setenv("KP_SCHEMA_REGISTRY_ENDPOINT", "http://localhost:8081")
t.Setenv("KP_KAFKA_BOOTSTRAP_SERVERS", "localhost")
setup()
kafkaConfig := config.Kafka{
BootstrapServers: "localhost",
ConsumerGroupName: "consumer-group-1",
}
schemaRegistryConfig := config.SchemaRegistry{Endpoint: "http://localhost:8081"}
kpConfig := config.KPConfig{KafkaConfig: kafkaConfig, SchemaRegistryConfig: schemaRegistryConfig}
t.Run("can read from kafka", func(t *testing.T) {
c, err := consumer.New([]string{"consumer-integration-topic-1"}, "consumer-group-1")
c, err := consumer.New([]string{"consumer-integration-topic-1"}, kafkaConfig.WithDefaults())
assert.NoError(t, err)
p1, err := producer.New[MyMsg]("consumer-integration-topic-1")
p1, err := producer.New[MyMsg]("consumer-integration-topic-1", kpConfig)
assert.NoError(t, err)
shouldContinue, numberOfMessage := true, 0
go func() {
Expand All @@ -50,11 +57,13 @@ func TestNew(t *testing.T) {
assert.Equal(t, 3, numberOfMessage)
})
t.Run("can read from multiple topics", func(t *testing.T) {
c, err := consumer.New([]string{"consumer-integration-topic-2", "consumer-integration-topic-3"}, "consumer-group-2")
cfg := kafkaConfig.WithDefaults()
cfg.ConsumerGroupName = "int-test-1"
c, err := consumer.New([]string{"consumer-integration-topic-2", "consumer-integration-topic-3"}, cfg)
assert.NoError(t, err)
p1, err := producer.New[MyMsg]("consumer-integration-topic-2")
p1, err := producer.New[MyMsg]("consumer-integration-topic-2", kpConfig)
assert.NoError(t, err)
p2, err := producer.New[MyMsg]("consumer-integration-topic-3")
p2, err := producer.New[MyMsg]("consumer-integration-topic-3", kpConfig)
assert.NoError(t, err)
shouldContinue, numberOfMessage := true, 0
go func() {
Expand All @@ -80,15 +89,30 @@ func TestNew(t *testing.T) {
assert.Equal(t, 3, numberOfMessage)
})
t.Run("returns error if config is invalid", func(t *testing.T) {
t.Setenv("KP_SCHEMA_REGISTRY_ENDPOINT", "")
t.Setenv("KP_KAFKA_BOOTSTRAP_SERVERS", "")
c, err := consumer.New([]string{}, "")
c, err := consumer.New([]string{}, kafkaConfig.WithDefaults())
assert.Error(t, err)
assert.Nil(t, c)
})
t.Run("returns error if there's no topic", func(t *testing.T) {
c, err := consumer.New([]string{}, "")
c, err := consumer.New([]string{}, kafkaConfig.WithDefaults())
assert.Error(t, err)
assert.Nil(t, c)
})
}
func setup() {
cfg := config.KPConfig{KafkaConfig: config.Kafka{BootstrapServers: "localhost"}, SchemaRegistryConfig: config.SchemaRegistry{Endpoint: "http://localhost:8081"}}
c, err := kafka.NewAdminClient(config.GetKafkaConfig(cfg.KafkaConfig))
if err != nil {
panic(err)
}
_, err = c.CreateTopics(context.Background(),
[]kafka.TopicSpecification{
{Topic: "consumer-integration-topic-2", ReplicationFactor: 1, NumPartitions: 1},
{Topic: "consumer-integration-topic-3", ReplicationFactor: 1, NumPartitions: 1},
{Topic: "user-logged-in-rewards-processor-dlt", ReplicationFactor: 1, NumPartitions: 1},
},
)
if err != nil {
panic(err)
}
}
12 changes: 4 additions & 8 deletions v2/internal/schemaregistry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"github.com/confluentinc/confluent-kafka-go/schemaregistry"
"github.com/heetch/avro"

"github.com/honestbank/kp/v2/internal/config"
"github.com/honestbank/kp/v2/config"
)

func getClient() (schemaregistry.Client, error) {
cfg, err := config.LoadConfig[Config]()
if err != nil {
return nil, err
}
func getClient(cfg config.SchemaRegistry) (schemaregistry.Client, error) {
client, err := schemaregistry.NewClient(schemaregistry.NewConfigWithAuthentication(
cfg.Endpoint,
cfg.Username,
Expand All @@ -26,8 +22,8 @@ func getClient() (schemaregistry.Client, error) {
return client, nil
}

func Publish[MessageType any](topicName string) (*int, error) {
client, err := getClient()
func Publish[MessageType any](topicName string, cfg config.SchemaRegistry) (*int, error) {
client, err := getClient(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 2 additions & 4 deletions v2/internal/schemaregistry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
package schemaregistry_test

import (
"os"
"testing"

"github.com/stretchr/testify/assert"

"github.com/honestbank/kp/v2/config"
"github.com/honestbank/kp/v2/internal/schemaregistry"
)

Expand All @@ -17,8 +17,6 @@ type BenchmarkMessage struct {
}

func TestPublish(t *testing.T) {
os.Setenv("KP_SCHEMA_REGISTRY_ENDPOINT", "http://localhost:8081")
defer os.Unsetenv("KP_SCHEMA_REGISTRY_ENDPOINT")
_, err := schemaregistry.Publish[BenchmarkMessage]("topic-kp")
_, err := schemaregistry.Publish[BenchmarkMessage]("topic-kp", config.SchemaRegistry{Endpoint: "http://localhost:8081"})
assert.NoError(t, err)
}
Loading

0 comments on commit 56f0d7a

Please sign in to comment.