Skip to content

Commit

Permalink
Merge branch 'master' into fix/sqlaggregate-index
Browse files Browse the repository at this point in the history
  • Loading branch information
tbuchaillot authored Aug 1, 2023
2 parents 84cb091 + 91dd8a0 commit 862882e
Showing 1 changed file with 28 additions and 5 deletions.
33 changes: 28 additions & 5 deletions pumps/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"crypto/tls"
"encoding/json"
"os"
"strconv"
"time"

"github.com/TykTechnologies/tyk-pump/analytics"
Expand Down Expand Up @@ -39,8 +41,8 @@ type KafkaConf struct {
ClientId string `json:"client_id" mapstructure:"client_id"`
// The topic that the writer will produce messages to.
Topic string `json:"topic" mapstructure:"topic"`
// Timeout is the maximum amount of time will wait for a connect or write to complete.
Timeout time.Duration `json:"timeout" mapstructure:"timeout"`
// Timeout is the maximum amount of seconds to wait for a connect or write to complete.
Timeout interface{} `json:"timeout" mapstructure:"timeout"`
// Enable "github.com/golang/snappy" codec to be used to compress Kafka messages. By default
// is `false`.
Compressed bool `json:"compressed" mapstructure:"compressed"`
Expand Down Expand Up @@ -90,6 +92,10 @@ func (k *KafkaPump) Init(config interface{}) error {
}

processPumpEnvVars(k, k.log, k.kafkaConf, kafkaDefaultENV)
// This interface field is not reached by envconfig library, that's why we manually check it
if os.Getenv("TYK_PMP_PUMPS_KAFKA_META_TIMEOUT") != "" {
k.kafkaConf.Timeout = os.Getenv("TYK_PMP_PUMPS_KAFKA_META_TIMEOUT")
}

var tlsConfig *tls.Config
if k.kafkaConf.UseSSL {
Expand Down Expand Up @@ -137,9 +143,26 @@ func (k *KafkaPump) Init(config interface{}) error {
k.log.WithField("SASL-Mechanism", k.kafkaConf.SASLMechanism).Warn("Tyk pump doesn't support this SASL mechanism.")
}

// Timeout is an interface type to allow both time.Duration and float values
var timeout time.Duration
switch v := k.kafkaConf.Timeout.(type) {
case string:
timeout, err = time.ParseDuration(v) // i.e: when timeout is '1s'
if err != nil {
floatValue, floatErr := strconv.ParseFloat(v, 64) // i.e: when timeout is '1'
if floatErr != nil {
k.log.Fatal("Failed to parse timeout: ", floatErr)
} else {
timeout = time.Duration(floatValue * float64(time.Second))
}
}
case float64:
timeout = time.Duration(v) * time.Second // i.e: when timeout is 1
}

//Kafka writer connection config
dialer := &kafka.Dialer{
Timeout: k.kafkaConf.Timeout * time.Second,
Timeout: timeout,
ClientID: k.kafkaConf.ClientId,
TLS: tlsConfig,
SASLMechanism: mechanism,
Expand All @@ -149,8 +172,8 @@ func (k *KafkaPump) Init(config interface{}) error {
k.writerConfig.Topic = k.kafkaConf.Topic
k.writerConfig.Balancer = &kafka.LeastBytes{}
k.writerConfig.Dialer = dialer
k.writerConfig.WriteTimeout = k.kafkaConf.Timeout * time.Second
k.writerConfig.ReadTimeout = k.kafkaConf.Timeout * time.Second
k.writerConfig.WriteTimeout = timeout
k.writerConfig.ReadTimeout = timeout
if k.kafkaConf.Compressed {
k.writerConfig.CompressionCodec = snappy.NewCompressionCodec()
}
Expand Down

0 comments on commit 862882e

Please sign in to comment.