Skip to content

Commit

Permalink
Adds timeout mechanism in reader with maxWait (#246)
Browse files Browse the repository at this point in the history
* Adds timeout mechanism in reader with maxWait
* Adds comment to test behavior
* Run prettier on the example scripts
* MaxWait is now a string
* Update tests

---------

Co-authored-by: Mostafa Moradian <mostafamoradian0@gmail.com>
  • Loading branch information
JasmineCA and mostafa authored Sep 14, 2023
1 parent aae567b commit 82d039f
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 6 deletions.
2 changes: 1 addition & 1 deletion api-docs/docs/interfaces/ReaderConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@

### maxWait

**maxWait**: `number`
**maxWait**: `string`

#### Defined in

Expand Down
2 changes: 1 addition & 1 deletion api-docs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export interface ReaderConfig {
minBytes: number;
maxBytes: number;
readBatchTimeout: number;
maxWait: number;
maxWait: string;
readLagInterval: number;
groupBalancers: GROUP_BALANCERS[];
heartbeatInterval: number;
Expand Down
38 changes: 34 additions & 4 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ReaderConfig struct {
Brokers []string `json:"brokers"`
GroupTopics []string `json:"groupTopics"`
GroupBalancers []string `json:"groupBalancers"`
MaxWait time.Duration `json:"maxWait"`
MaxWait Duration `json:"maxWait"`
ReadBatchTimeout time.Duration `json:"readBatchTimeout"`
ReadLagInterval time.Duration `json:"readLagInterval"`
HeartbeatInterval time.Duration `json:"heartbeatInterval"`
Expand All @@ -78,6 +78,33 @@ type ConsumeConfig struct {
Limit int64 `json:"limit"`
}

type Duration struct {
time.Duration
}

func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(d.String())
}

func (d *Duration) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}

switch value := v.(type) {
case string:
var err error
d.Duration, err = time.ParseDuration(value)
if err != nil {
return err
}
return nil
default:
return errors.New("invalid duration")
}
}

// readerClass is a wrapper around kafkago.reader and acts as a JS constructor
// for this extension, thus it must be called with new operator, e.g. new Reader(...).
// nolint: funlen
Expand Down Expand Up @@ -227,7 +254,7 @@ func (k *Kafka) reader(readerConfig *ReaderConfig) *kafkago.Reader {
QueueCapacity: readerConfig.QueueCapacity,
MinBytes: readerConfig.MinBytes,
MaxBytes: readerConfig.MaxBytes,
MaxWait: readerConfig.MaxWait,
MaxWait: readerConfig.MaxWait.Duration,
ReadBatchTimeout: readerConfig.ReadBatchTimeout,
ReadLagInterval: readerConfig.ReadLagInterval,
GroupBalancers: groupBalancers,
Expand Down Expand Up @@ -297,9 +324,12 @@ func (k *Kafka) consume(

messages := make([]map[string]interface{}, 0)

for i := int64(0); i < consumeConfig.Limit; i++ {
msg, err := reader.ReadMessage(ctx)
maxWait := reader.Config().MaxWait

for i := int64(0); i < consumeConfig.Limit; i++ {
ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait)
msg, err := reader.ReadMessage(ctxWithTimeout)
cancel()
if errors.Is(err, io.EOF) {
k.reportReaderStats(reader.Stats())

Expand Down
44 changes: 44 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,51 @@ package kafka
import (
"encoding/json"
"testing"
"time"

"github.com/dop251/goja"
"github.com/riferrei/srclient"
kafkago "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestConsumerMaxWaitExceeded tests the consume function when no messages are sent.
// The reader should not hang
func TestConsumerMaxWaitExceeded(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
defer writer.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
MaxWait: Duration{time.Second * 3},
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

// Consume a message in the VU function.
assert.NotPanics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
})
})

// Check if no message was consumed.
metricsValues := test.getCounterMetricsValues()
assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderDials.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderErrors.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderBytes.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderMessages.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderRebalances.Name])
}

// TestConsume tests the consume function.
// nolint: funlen
func TestConsume(t *testing.T) {
Expand Down Expand Up @@ -297,11 +335,17 @@ func TestReaderClass(t *testing.T) {
map[string]interface{}{
"brokers": []string{"localhost:9092"},
"topic": "test-reader-class",
"maxWait": "3s",
},
),
},
})
assert.NotNil(t, reader)
this := reader.Get("This").Export().(*kafkago.Reader)
assert.NotNil(t, this)
assert.Equal(t, this.Config().Brokers, []string{"localhost:9092"})
assert.Equal(t, this.Config().Topic, "test-reader-class")
assert.Equal(t, this.Config().MaxWait, time.Second*3)

consume := reader.Get("consume").Export().(func(goja.FunctionCall) goja.Value)
messages := consume(goja.FunctionCall{
Expand Down
59 changes: 59 additions & 0 deletions scripts/test_timeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
This is a k6 test script that imports the xk6-kafka and
*/

import { check } from "k6";
// import * as kafka from "k6/x/kafka";
import { Reader, Connection } from "k6/x/kafka"; // import kafka extension

// Prints module-level constants
// console.log(kafka);

const brokers = ["localhost:9092"];
const topic = "xk6_kafka_json_topic";

const reader = new Reader({
brokers: brokers,
topic: topic,
maxWait: "5s",
});

const connection = new Connection({
address: brokers[0],
});

if (__VU === 0) {
connection.createTopic({ topic: topic });
}

export const options = {
thresholds: {
// Base thresholds to see if the writer or reader is working
kafka_writer_error_count: ["count == 0"],
kafka_reader_error_count: ["count == 0"],
},
duration: "11s",
};

export default function () {
// Read 10 messages only
let messages = reader.consume({ limit: 10 });

console.log("continuing execution");

check(messages, {
"10 messages are received": (messages) => messages.length === 10,
});
}

export function teardown(data) {
if (__VU === 0) {
// Delete the topic
connection.deleteTopic(topic);
}
reader.close();
connection.close();
}

0 comments on commit 82d039f

Please sign in to comment.