diff --git a/api-docs/docs/interfaces/ReaderConfig.md b/api-docs/docs/interfaces/ReaderConfig.md index 2ed88a0..34af2c9 100644 --- a/api-docs/docs/interfaces/ReaderConfig.md +++ b/api-docs/docs/interfaces/ReaderConfig.md @@ -148,7 +148,7 @@ ### maxWait -• **maxWait**: `number` +• **maxWait**: `string` #### Defined in diff --git a/api-docs/index.d.ts b/api-docs/index.d.ts index 05f1763..755398c 100644 --- a/api-docs/index.d.ts +++ b/api-docs/index.d.ts @@ -177,7 +177,7 @@ export interface ReaderConfig { minBytes: number; maxBytes: number; readBatchTimeout: number; - maxWait: number; + maxWait: string; readLagInterval: number; groupBalancers: GROUP_BALANCERS[]; heartbeatInterval: number; diff --git a/reader.go b/reader.go index e08a8c6..010d326 100644 --- a/reader.go +++ b/reader.go @@ -57,7 +57,7 @@ type ReaderConfig struct { Brokers []string `json:"brokers"` GroupTopics []string `json:"groupTopics"` GroupBalancers []string `json:"groupBalancers"` - MaxWait time.Duration `json:"maxWait"` + MaxWait Duration `json:"maxWait"` ReadBatchTimeout time.Duration `json:"readBatchTimeout"` ReadLagInterval time.Duration `json:"readLagInterval"` HeartbeatInterval time.Duration `json:"heartbeatInterval"` @@ -78,6 +78,33 @@ type ConsumeConfig struct { Limit int64 `json:"limit"` } +type Duration struct { + time.Duration +} + +func (d Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(d.String()) +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + + switch value := v.(type) { + case string: + var err error + d.Duration, err = time.ParseDuration(value) + if err != nil { + return err + } + return nil + default: + return errors.New("invalid duration") + } +} + // readerClass is a wrapper around kafkago.reader and acts as a JS constructor // for this extension, thus it must be called with new operator, e.g. new Reader(...). // nolint: funlen @@ -227,7 +254,7 @@ func (k *Kafka) reader(readerConfig *ReaderConfig) *kafkago.Reader { QueueCapacity: readerConfig.QueueCapacity, MinBytes: readerConfig.MinBytes, MaxBytes: readerConfig.MaxBytes, - MaxWait: readerConfig.MaxWait, + MaxWait: readerConfig.MaxWait.Duration, ReadBatchTimeout: readerConfig.ReadBatchTimeout, ReadLagInterval: readerConfig.ReadLagInterval, GroupBalancers: groupBalancers, @@ -297,9 +324,12 @@ func (k *Kafka) consume( messages := make([]map[string]interface{}, 0) - for i := int64(0); i < consumeConfig.Limit; i++ { - msg, err := reader.ReadMessage(ctx) + maxWait := reader.Config().MaxWait + for i := int64(0); i < consumeConfig.Limit; i++ { + ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait) + msg, err := reader.ReadMessage(ctxWithTimeout) + cancel() if errors.Is(err, io.EOF) { k.reportReaderStats(reader.Stats()) diff --git a/reader_test.go b/reader_test.go index da9b080..eac05e6 100644 --- a/reader_test.go +++ b/reader_test.go @@ -3,13 +3,51 @@ package kafka import ( "encoding/json" "testing" + "time" "github.com/dop251/goja" "github.com/riferrei/srclient" + kafkago "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// TestConsumerMaxWaitExceeded tests the consume function when no messages are sent. +// The reader should not hang +func TestConsumerMaxWaitExceeded(t *testing.T) { + test := getTestModuleInstance(t) + writer := test.newWriter("test-topic") + defer writer.Close() + + // Create a reader to consume messages. + assert.NotPanics(t, func() { + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "test-topic", + MaxWait: Duration{time.Second * 3}, + }) + assert.NotNil(t, reader) + defer reader.Close() + + // Switch to VU code. + require.NoError(t, test.moveToVUCode()) + + // Consume a message in the VU function. + assert.NotPanics(t, func() { + messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) + assert.Empty(t, messages) + }) + }) + + // Check if no message was consumed. + metricsValues := test.getCounterMetricsValues() + assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderDials.Name]) + assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderErrors.Name]) + assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderBytes.Name]) + assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderMessages.Name]) + assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderRebalances.Name]) +} + // TestConsume tests the consume function. // nolint: funlen func TestConsume(t *testing.T) { @@ -297,11 +335,17 @@ func TestReaderClass(t *testing.T) { map[string]interface{}{ "brokers": []string{"localhost:9092"}, "topic": "test-reader-class", + "maxWait": "3s", }, ), }, }) assert.NotNil(t, reader) + this := reader.Get("This").Export().(*kafkago.Reader) + assert.NotNil(t, this) + assert.Equal(t, this.Config().Brokers, []string{"localhost:9092"}) + assert.Equal(t, this.Config().Topic, "test-reader-class") + assert.Equal(t, this.Config().MaxWait, time.Second*3) consume := reader.Get("consume").Export().(func(goja.FunctionCall) goja.Value) messages := consume(goja.FunctionCall{ diff --git a/scripts/test_timeout.js b/scripts/test_timeout.js new file mode 100644 index 0000000..57c202f --- /dev/null +++ b/scripts/test_timeout.js @@ -0,0 +1,59 @@ +/* + +This is a k6 test script that imports the xk6-kafka and + + +*/ + +import { check } from "k6"; +// import * as kafka from "k6/x/kafka"; +import { Reader, Connection } from "k6/x/kafka"; // import kafka extension + +// Prints module-level constants +// console.log(kafka); + +const brokers = ["localhost:9092"]; +const topic = "xk6_kafka_json_topic"; + +const reader = new Reader({ + brokers: brokers, + topic: topic, + maxWait: "5s", +}); + +const connection = new Connection({ + address: brokers[0], +}); + +if (__VU === 0) { + connection.createTopic({ topic: topic }); +} + +export const options = { + thresholds: { + // Base thresholds to see if the writer or reader is working + kafka_writer_error_count: ["count == 0"], + kafka_reader_error_count: ["count == 0"], + }, + duration: "11s", +}; + +export default function () { + // Read 10 messages only + let messages = reader.consume({ limit: 10 }); + + console.log("continuing execution"); + + check(messages, { + "10 messages are received": (messages) => messages.length === 10, + }); +} + +export function teardown(data) { + if (__VU === 0) { + // Delete the topic + connection.deleteTopic(topic); + } + reader.close(); + connection.close(); +}