Skip to content

Commit

Permalink
[kafka-producer] Support setting max message size (jaegertracing#5263)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Exposes MaxMessageBytes config mentioned in
jaegertracing#1335

## Description of the changes
- Adds support to set MaxMessageBytes config on producer
- Also setting a default which matches with
[sarama](https://github.com/IBM/sarama/blob/main/config.go#L177) default
unclear if we need to set that default explicitly from jaeger producer
initialization

## How was this change tested?
- Tested on our development jaeger cluster. After seeing below error we
made following changes and built a new Image and verified that my
changes fixed the issue
```
{"level":"error","ts":1550003610.8029132,"caller":"kafka/writer.go:59","msg":"kafka server: Message was too large, server rejected it to avoid allocation error.","stacktrace":"[github.com/jaegertracing/jaeger/plugin/storage/kafka.NewSpanWriter.func2](http://github.com/jaegertracing/jaeger/plugin/storage/kafka.NewSpanWriter.func2)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/plugin/storage/kafka/writer.go:59"}
```

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Saketh kappala <43525626+sappusaketh@users.noreply.github.com>
Signed-off-by: Saketh <43525626+sappusaketh@users.noreply.github.com>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
  • Loading branch information
sappusaketh and yurishkuro authored Mar 9, 2024
1 parent 083806f commit bf84fc6
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Configuration struct {
BatchSize int `mapstructure:"batch_size"`
BatchMinMessages int `mapstructure:"batch_min_messages"`
BatchMaxMessages int `mapstructure:"batch_max_messages"`
MaxMessageBytes int `mapstructure:"max_message_bytes"`
auth.AuthenticationConfig `mapstructure:"authentication"`
}

Expand All @@ -53,6 +54,7 @@ func (c *Configuration) NewProducer(logger *zap.Logger) (sarama.AsyncProducer, e
saramaConfig.Producer.Flush.Frequency = c.BatchLinger
saramaConfig.Producer.Flush.Messages = c.BatchMinMessages
saramaConfig.Producer.Flush.MaxMessages = c.BatchMaxMessages
saramaConfig.Producer.MaxMessageBytes = c.MaxMessageBytes
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
suffixBatchSize = ".batch-size"
suffixBatchMinMessages = ".batch-min-messages"
suffixBatchMaxMessages = ".batch-max-messages"
suffixMaxMessageBytes = ".max-message-bytes"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
Expand All @@ -58,6 +59,7 @@ const (
defaultBatchSize = 0
defaultBatchMinMessages = 0
defaultBatchMaxMessages = 0
defaultMaxMessageBytes = 1000000 // https://github.com/IBM/sarama/blob/main/config.go#L177
)

var (
Expand Down Expand Up @@ -152,6 +154,11 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultBatchMaxMessages,
"(experimental) Maximum number of message to batch before sending records to Kafka",
)
flagSet.Int(
configPrefix+suffixMaxMessageBytes,
defaultMaxMessageBytes,
"(experimental) The maximum permitted size of a message. Should be set equal to or smaller than the broker's `message.max.bytes`.",
)
flagSet.String(
configPrefix+suffixBrokers,
defaultBroker,
Expand Down Expand Up @@ -207,6 +214,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
BatchSize: v.GetInt(configPrefix + suffixBatchSize),
BatchMinMessages: v.GetInt(configPrefix + suffixBatchMinMessages),
BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages),
MaxMessageBytes: v.GetInt(configPrefix + suffixMaxMessageBytes),
}
opt.Topic = v.GetString(configPrefix + suffixTopic)
opt.Encoding = v.GetString(configPrefix + suffixEncoding)
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.producer.batch-size=128000",
"--kafka.producer.batch-min-messages=50",
"--kafka.producer.batch-max-messages=100",
"--kafka.producer.max-message-bytes=10485760",
})
opts.InitFromViper(v)

Expand All @@ -55,6 +56,8 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, time.Duration(1*time.Second), opts.Config.BatchLinger)
assert.Equal(t, 50, opts.Config.BatchMinMessages)
assert.Equal(t, 100, opts.Config.BatchMaxMessages)
assert.Equal(t, 100, opts.Config.BatchMaxMessages)
assert.Equal(t, 10485760, opts.Config.MaxMessageBytes)
}

func TestFlagDefaults(t *testing.T) {
Expand All @@ -73,6 +76,7 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, time.Duration(0*time.Second), opts.Config.BatchLinger)
assert.Equal(t, 0, opts.Config.BatchMinMessages)
assert.Equal(t, 0, opts.Config.BatchMaxMessages)
assert.Equal(t, defaultMaxMessageBytes, opts.Config.MaxMessageBytes)
}

func TestCompressionLevelDefaults(t *testing.T) {
Expand Down

0 comments on commit bf84fc6

Please sign in to comment.