diff --git a/README.md b/README.md index 2e417c6..9ba2020 100644 --- a/README.md +++ b/README.md @@ -65,15 +65,15 @@ Feel free to skip the first two steps if you already have Go installed. 2. Install the latest version of Go using gvm. You need Go 1.4 installed for bootstrapping into higher Go versions, as explained [here](https://github.com/moovweb/gvm#a-note-on-compiling-go-15). 3. Install `xk6`: - ```shell - go install go.k6.io/xk6/cmd/xk6@latest - ``` + ```shell + go install go.k6.io/xk6/cmd/xk6@latest + ``` 4. Build the binary: - ```shell - xk6 build --with github.com/mostafa/xk6-kafka@latest - ``` + ```shell + xk6 build --with github.com/mostafa/xk6-kafka@latest + ``` > **Note** > You can always use the latest version of k6 to build the extension, but the earliest version of k6 that supports extensions via xk6 is v0.32.0. The xk6 is constantly evolving, so some APIs may not be backward compatible. @@ -101,27 +101,27 @@ I recommend the [fast-data-dev](https://github.com/lensesio/fast-data-dev) Docke 1. Run the Kafka environment and expose the ports: - ```bash - sudo docker run \ - --detach --rm \ - --name lensesio \ - -p 2181:2181 \ - -p 3030:3030 \ - -p 8081-8083:8081-8083 \ - -p 9581-9585:9581-9585 \ - -p 9092:9092 \ - -e ADV_HOST=127.0.0.1 \ - -e RUN_TESTS=0 \ - lensesio/fast-data-dev:latest - ``` + ```bash + sudo docker run \ + --detach --rm \ + --name lensesio \ + -p 2181:2181 \ + -p 3030:3030 \ + -p 8081-8083:8081-8083 \ + -p 9581-9585:9581-9585 \ + -p 9092:9092 \ + -e ADV_HOST=127.0.0.1 \ + -e RUN_TESTS=0 \ + lensesio/fast-data-dev:latest + ``` 2. After running the command, visit [localhost:3030](http://localhost:3030) to get into the fast-data-dev environment. 3. You can run the command to see the container logs: - ```bash - sudo docker logs -f -t lensesio - ``` + ```bash + sudo docker logs -f -t lensesio + ``` > **Note:** > If you have errors running the Kafka development environment, refer to the [fast-data-dev documentation](https://github.com/lensesio/fast-data-dev). @@ -139,197 +139,197 @@ The example scripts are available as `test_.js` with more code a 1. To use the extension, you need to import it in your script, like any other JS module: - ```javascript - // Either import the module object - import * as kafka from "k6/x/kafka"; - - // Or individual classes and constants - import { - Writer, - Reader, - Connection, - SchemaRegistry, - SCHEMA_TYPE_STRING, - } from "k6/x/kafka"; - ``` + ```javascript + // Either import the module object + import * as kafka from "k6/x/kafka"; + + // Or individual classes and constants + import { + Writer, + Reader, + Connection, + SchemaRegistry, + SCHEMA_TYPE_STRING, + } from "k6/x/kafka"; + ``` 2. You need to instantiate the classes in the `init` context. All the [k6 options](https://k6.io/docs/using-k6/k6-options/) are also configured here: - ```javascript - // Creates a new Writer object to produce messages to Kafka - const writer = new Writer({ - // WriterConfig object - brokers: ["localhost:9092"], - topic: "my-topic", - }); - - const reader = new Reader({ - // ReaderConfig object - brokers: ["localhost:9092"], - topic: "my-topic", - }); - - const connection = new Connection({ - // ConnectionConfig object - address: "localhost:9092", - }); - - const schemaRegistry = new SchemaRegistry(); - // Can accept a SchemaRegistryConfig object - - if (__VU == 0) { - // Create a topic on initialization (before producing messages) - connection.createTopic({ - // TopicConfig object - topic: "my-topic", - }); - } - ``` + ```javascript + // Creates a new Writer object to produce messages to Kafka + const writer = new Writer({ + // WriterConfig object + brokers: ["localhost:9092"], + topic: "my-topic", + }); + + const reader = new Reader({ + // ReaderConfig object + brokers: ["localhost:9092"], + topic: "my-topic", + }); + + const connection = new Connection({ + // ConnectionConfig object + address: "localhost:9092", + }); + + const schemaRegistry = new SchemaRegistry(); + // Can accept a SchemaRegistryConfig object + + if (__VU == 0) { + // Create a topic on initialization (before producing messages) + connection.createTopic({ + // TopicConfig object + topic: "my-topic", + }); + } + ``` 3. In the VU code, you can produce messages to Kafka or consume messages from it: - ```javascript - export default function () { - // Fetch the list of all topics - const topics = connection.listTopics(); - console.log(topics); // list of topics - - // Produces message to Kafka - writer.produce({ - // ProduceConfig object - messages: [ - // Message object(s) - { - key: schemaRegistry.serialize({ - data: "my-key", - schemaType: SCHEMA_TYPE_STRING, - }), - value: schemaRegistry.serialize({ - data: "my-value", - schemaType: SCHEMA_TYPE_STRING, - }), - }, - ], - }); - - // Consume messages from Kafka - let messages = reader.consume({ - // ConsumeConfig object - limit: 10, - }); - - // your messages - console.log(messages); - - // You can use checks to verify the contents, - // length and other properties of the message(s) - - // To serialize the data back into a string, you should use - // the deserialize method of the Schema Registry client. You - // can use it inside a check, as shown in the example scripts. - let deserializedValue = schemaRegistry.deserialize({ - data: messages[0].value, - schemaType: SCHEMA_TYPE_STRING, - }); - } - ``` + ```javascript + export default function () { + // Fetch the list of all topics + const topics = connection.listTopics(); + console.log(topics); // list of topics + + // Produces message to Kafka + writer.produce({ + // ProduceConfig object + messages: [ + // Message object(s) + { + key: schemaRegistry.serialize({ + data: "my-key", + schemaType: SCHEMA_TYPE_STRING, + }), + value: schemaRegistry.serialize({ + data: "my-value", + schemaType: SCHEMA_TYPE_STRING, + }), + }, + ], + }); + + // Consume messages from Kafka + let messages = reader.consume({ + // ConsumeConfig object + limit: 10, + }); + + // your messages + console.log(messages); + + // You can use checks to verify the contents, + // length and other properties of the message(s) + + // To serialize the data back into a string, you should use + // the deserialize method of the Schema Registry client. You + // can use it inside a check, as shown in the example scripts. + let deserializedValue = schemaRegistry.deserialize({ + data: messages[0].value, + schemaType: SCHEMA_TYPE_STRING, + }); + } + ``` 4. In the `teardown` function, close all the connections and possibly delete the topic: - ```javascript - export function teardown(data) { - // Delete the topic - connection.deleteTopic("my-topic"); - - // Close all connections - writer.close(); - reader.close(); - connection.close(); - } - ``` + ```javascript + export function teardown(data) { + // Delete the topic + connection.deleteTopic("my-topic"); + + // Close all connections + writer.close(); + reader.close(); + connection.close(); + } + ``` 5. You can now run k6 with the extension using the following command: - ```bash - ./k6 run --vus 50 --duration 60s scripts/test_json.js - ``` + ```bash + ./k6 run --vus 50 --duration 60s scripts/test_json.js + ``` 6. And here's the test result output: - ```bash + ```bash - /\ |‾‾| /‾‾/ /‾‾/ - /\ / \ | |/ / / / - / \/ \ | ( / ‾‾\ + /\ |‾‾| /‾‾/ /‾‾/ + /\ / \ | |/ / / / + / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | - / __________ \ |__| \__\ \_____/ .io - - execution: local - script: scripts/test_json.js - output: - - - scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop): - * default: 50 looping VUs for 1m0s (gracefulStop: 30s) - - - running (1m04.4s), 00/50 VUs, 20170 complete and 0 interrupted iterations - default ✓ [======================================] 50 VUs 1m0s - - ✓ 10 messages are received - ✓ Topic equals to xk6_kafka_json_topic - ✓ Key contains key/value and is JSON - ✓ Value contains key/value and is JSON - ✓ Header equals {'mykey': 'myvalue'} - ✓ Time is past - ✓ Partition is zero - ✓ Offset is gte zero - ✓ High watermark is gte zero - - █ teardown - - checks.........................: 100.00% ✓ 181530 ✗ 0 - data_received..................: 0 B 0 B/s - data_sent......................: 0 B 0 B/s - iteration_duration.............: avg=153.45ms min=6.01ms med=26.8ms max=8.14s p(90)=156.3ms p(95)=206.4ms - iterations.....................: 20170 313.068545/s - kafka_reader_dial_count........: 50 0.776075/s - kafka_reader_dial_seconds......: avg=171.22µs min=0s med=0s max=1.09s p(90)=0s p(95)=0s - ✓ kafka_reader_error_count.......: 0 0/s - kafka_reader_fetch_bytes_max...: 1000000 min=1000000 max=1000000 - kafka_reader_fetch_bytes_min...: 1 min=1 max=1 - kafka_reader_fetch_wait_max....: 200ms min=200ms max=200ms - kafka_reader_fetch_bytes.......: 58 MB 897 kB/s - kafka_reader_fetch_size........: 147167 2284.25179/s - kafka_reader_fetches_count.....: 107 1.6608/s - kafka_reader_lag...............: 1519055 min=0 max=2436190 - kafka_reader_message_bytes.....: 40 MB 615 kB/s - kafka_reader_message_count.....: 201749 3131.446006/s - kafka_reader_offset............: 4130 min=11 max=5130 - kafka_reader_queue_capacity....: 1 min=1 max=1 - kafka_reader_queue_length......: 1 min=0 max=1 - kafka_reader_read_seconds......: avg=96.5ms min=0s med=0s max=59.37s p(90)=0s p(95)=0s - kafka_reader_rebalance_count...: 0 0/s - kafka_reader_timeouts_count....: 57 0.884725/s - kafka_reader_wait_seconds......: avg=102.71µs min=0s med=0s max=85.71ms p(90)=0s p(95)=0s - kafka_writer_acks_required.....: 0 min=0 max=0 - kafka_writer_async.............: 0.00% ✓ 0 ✗ 2017000 - kafka_writer_attempts_max......: 0 min=0 max=0 - kafka_writer_batch_bytes.......: 441 MB 6.8 MB/s - kafka_writer_batch_max.........: 1 min=1 max=1 - kafka_writer_batch_size........: 2017000 31306.854525/s - kafka_writer_batch_timeout.....: 0s min=0s max=0s - ✓ kafka_writer_error_count.......: 0 0/s - kafka_writer_message_bytes.....: 883 MB 14 MB/s - kafka_writer_message_count.....: 4034000 62613.709051/s - kafka_writer_read_timeout......: 0s min=0s max=0s - kafka_writer_retries_count.....: 0 0/s - kafka_writer_wait_seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s - kafka_writer_write_count.......: 4034000 62613.709051/s - kafka_writer_write_seconds.....: avg=523.21µs min=4.84µs med=14.48µs max=4.05s p(90)=33.85µs p(95)=42.68µs - kafka_writer_write_timeout.....: 0s min=0s max=0s - vus............................: 7 min=7 max=50 - vus_max........................: 50 min=50 max=50 - ``` + / __________ \ |__| \__\ \_____/ .io + + execution: local + script: scripts/test_json.js + output: - + + scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop): + * default: 50 looping VUs for 1m0s (gracefulStop: 30s) + + + running (1m04.4s), 00/50 VUs, 20170 complete and 0 interrupted iterations + default ✓ [======================================] 50 VUs 1m0s + + ✓ 10 messages are received + ✓ Topic equals to xk6_kafka_json_topic + ✓ Key contains key/value and is JSON + ✓ Value contains key/value and is JSON + ✓ Header equals {'mykey': 'myvalue'} + ✓ Time is past + ✓ Partition is zero + ✓ Offset is gte zero + ✓ High watermark is gte zero + + █ teardown + + checks.........................: 100.00% ✓ 181530 ✗ 0 + data_received..................: 0 B 0 B/s + data_sent......................: 0 B 0 B/s + iteration_duration.............: avg=153.45ms min=6.01ms med=26.8ms max=8.14s p(90)=156.3ms p(95)=206.4ms + iterations.....................: 20170 313.068545/s + kafka_reader_dial_count........: 50 0.776075/s + kafka_reader_dial_seconds......: avg=171.22µs min=0s med=0s max=1.09s p(90)=0s p(95)=0s + ✓ kafka_reader_error_count.......: 0 0/s + kafka_reader_fetch_bytes_max...: 1000000 min=1000000 max=1000000 + kafka_reader_fetch_bytes_min...: 1 min=1 max=1 + kafka_reader_fetch_wait_max....: 200ms min=200ms max=200ms + kafka_reader_fetch_bytes.......: 58 MB 897 kB/s + kafka_reader_fetch_size........: 147167 2284.25179/s + kafka_reader_fetches_count.....: 107 1.6608/s + kafka_reader_lag...............: 1519055 min=0 max=2436190 + kafka_reader_message_bytes.....: 40 MB 615 kB/s + kafka_reader_message_count.....: 201749 3131.446006/s + kafka_reader_offset............: 4130 min=11 max=5130 + kafka_reader_queue_capacity....: 1 min=1 max=1 + kafka_reader_queue_length......: 1 min=0 max=1 + kafka_reader_read_seconds......: avg=96.5ms min=0s med=0s max=59.37s p(90)=0s p(95)=0s + kafka_reader_rebalance_count...: 0 0/s + kafka_reader_timeouts_count....: 57 0.884725/s + kafka_reader_wait_seconds......: avg=102.71µs min=0s med=0s max=85.71ms p(90)=0s p(95)=0s + kafka_writer_acks_required.....: 0 min=0 max=0 + kafka_writer_async.............: 0.00% ✓ 0 ✗ 2017000 + kafka_writer_attempts_max......: 0 min=0 max=0 + kafka_writer_batch_bytes.......: 441 MB 6.8 MB/s + kafka_writer_batch_max.........: 1 min=1 max=1 + kafka_writer_batch_size........: 2017000 31306.854525/s + kafka_writer_batch_timeout.....: 0s min=0s max=0s + ✓ kafka_writer_error_count.......: 0 0/s + kafka_writer_message_bytes.....: 883 MB 14 MB/s + kafka_writer_message_count.....: 4034000 62613.709051/s + kafka_writer_read_timeout......: 0s min=0s max=0s + kafka_writer_retries_count.....: 0 0/s + kafka_writer_wait_seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s + kafka_writer_write_count.......: 4034000 62613.709051/s + kafka_writer_write_seconds.....: avg=523.21µs min=4.84µs med=14.48µs max=4.05s p(90)=33.85µs p(95)=42.68µs + kafka_writer_write_timeout.....: 0s min=0s max=0s + vus............................: 7 min=7 max=50 + vus_max........................: 50 min=50 max=50 + ``` ### Emitted Metrics @@ -377,57 +377,57 @@ The example scripts are available as `test_.js` with more code a 1. Why do I receive `Error writing messages`? - There are a few reasons why this might happen. The most prominent one is that the topic might not exist, which causes the producer to fail to send messages to a non-existent topic. You can use `Connection.createTopic` method to create the topic in Kafka, as shown in `scripts/test_topics.js`. You can also set the `autoCreateTopic` on the `WriterConfig`. You can also create a topic using the `kafka-topics` command: + There are a few reasons why this might happen. The most prominent one is that the topic might not exist, which causes the producer to fail to send messages to a non-existent topic. You can use `Connection.createTopic` method to create the topic in Kafka, as shown in `scripts/test_topics.js`. You can also set the `autoCreateTopic` on the `WriterConfig`. You can also create a topic using the `kafka-topics` command: - ```bash - $ docker exec -it lensesio bash - (inside container)$ kafka-topics --create --topic xk6_kafka_avro_topic --bootstrap-server localhost:9092 - (inside container)$ kafka-topics --create --topic xk6_kafka_json_topic --bootstrap-server localhost:9092 - ``` + ```bash + $ docker exec -it lensesio bash + (inside container)$ kafka-topics --create --topic xk6_kafka_avro_topic --bootstrap-server localhost:9092 + (inside container)$ kafka-topics --create --topic xk6_kafka_json_topic --bootstrap-server localhost:9092 + ``` 2. Why does the `reader.consume` keep hanging? - If the `reader.consume` keeps hanging, it might be because the topic doesn't exist or is empty. + If the `reader.consume` keeps hanging, it might be because the topic doesn't exist or is empty. 3. I want to test SASL authentication. How should I do that? - If you want to test SASL authentication, look at [this commit message](https://github.com/mostafa/xk6-kafka/pull/3/commits/403fbc48d13683d836b8033eeeefa48bf2f25c6e), in which I describe how to run a test environment to test SASL authentication. + If you want to test SASL authentication, look at [this commit message](https://github.com/mostafa/xk6-kafka/pull/3/commits/403fbc48d13683d836b8033eeeefa48bf2f25c6e), in which I describe how to run a test environment to test SASL authentication. 4. Why doesn't the consumer group consume messages from the topic? - As explained in issue [#37](https://github.com/mostafa/xk6-kafka/issues/37), multiple inits by k6 cause multiple consumer group instances to be created in the init context, which sometimes causes the random partitions to be selected by each instance. This, in turn, causes confusion when consuming messages from different partitions. This can be solved by using a UUID when naming the consumer group, thereby guaranteeing that the consumer group object was assigned to all partitions in a topic. + As explained in issue [#37](https://github.com/mostafa/xk6-kafka/issues/37), multiple inits by k6 cause multiple consumer group instances to be created in the init context, which sometimes causes the random partitions to be selected by each instance. This, in turn, causes confusion when consuming messages from different partitions. This can be solved by using a UUID when naming the consumer group, thereby guaranteeing that the consumer group object was assigned to all partitions in a topic. 5. Why do I receive a `MessageTooLargeError` when I produce messages bigger than 1 MB? - Kafka has a [maximum message size](https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html) of 1 MB by default, which is set by `message.max.bytes`, and this limit is also applied to the `Writer` object. + Kafka has a [maximum message size](https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html) of 1 MB by default, which is set by `message.max.bytes`, and this limit is also applied to the `Writer` object. - There are two ways to produce larger messages: 1) Change the default value of your Kafka instance to a larger number. 2) Use compression. + There are two ways to produce larger messages: 1) Change the default value of your Kafka instance to a larger number. 2) Use compression. - Remember that the `Writer` object will reject messages larger than the default Kafka message size limit (1 MB). Hence you need to set `batchBytes` to a larger value, for example, `1024 * 1024 * 2` (2 MB). The `batchBytes` refers to the raw uncompressed size of all the keys and values (data) in your array of messages you pass to the `Writer` object. You can calculate the raw data size of your messages using [this example script](https://github.com/mostafa/xk6-kafka/issues/181#issuecomment-1325390880). + Remember that the `Writer` object will reject messages larger than the default Kafka message size limit (1 MB). Hence you need to set `batchBytes` to a larger value, for example, `1024 * 1024 * 2` (2 MB). The `batchBytes` refers to the raw uncompressed size of all the keys and values (data) in your array of messages you pass to the `Writer` object. You can calculate the raw data size of your messages using [this example script](https://github.com/mostafa/xk6-kafka/issues/181#issuecomment-1325390880). 6. Can I consume messages from a consumer group in a topic with multiple partitions? - Yes, you can. Just pass the `groupID` to your `Reader` object. You must not specify the partition anymore. Visit this [documentation article](https://docs.confluent.io/platform/current/clients/consumer.html#concepts) to learn more about Kafka consumer groups. + Yes, you can. Just pass the `groupID` to your `Reader` object. You must not specify the partition anymore. Visit this [documentation article](https://docs.confluent.io/platform/current/clients/consumer.html#concepts) to learn more about Kafka consumer groups. - Remember that you must set `sessionTimeout` on your `Reader` object if the consume function terminates abruptly, thus failing to consume messages. + Remember that you must set `sessionTimeout` on your `Reader` object if the consume function terminates abruptly, thus failing to consume messages. 7. Why does the `Reader.consume` produces an `unable to read message` error? - For performance testing reasons, the `maxWait` of the `Reader` is set to 200ms. If you keep receiving this error, consider increasing it to a larger value. + For performance testing reasons, the `maxWait` of the `Reader` is set to 200ms. If you keep receiving this error, consider increasing it to a larger value. 8. How can I consume from multiple partitions on a single topic? - You can configure your reader to consume from a (list of) topic(s) and its partitions using a consumer group. This can be achieve by setting `groupTopics`, `groupID` and a few other options for timeouts, intervals and lags. Have a look at the [`test_consumer_group.js`](https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_consumer_group.js) example script. + You can configure your reader to consume from a (list of) topic(s) and its partitions using a consumer group. This can be achieve by setting `groupTopics`, `groupID` and a few other options for timeouts, intervals and lags. Have a look at the [`test_consumer_group.js`](https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_consumer_group.js) example script. 9. How can I use autocompletion in IDEs? - Copy [`api-docs/index.d.ts`](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts) into your project directory and reference it at the top of your JavaScript file: + Copy [`api-docs/index.d.ts`](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts) into your project directory and reference it at the top of your JavaScript file: - ```javascript - /// + ```javascript + /// - ... - ``` + ... + ``` 10. Why timeouts give up sooner than expected? @@ -440,6 +440,24 @@ The example scripts are available as `test_.js` with more code a console.log(typeof SECOND); // number ``` +11. Can I catch errors returned by the consume function? + + Yes. You can catch errors by using a try-catch block. The consume function returns an error object. If the consume function raises, the error object will be populated with the error message. + + ```javascript + try { + let messages = reader.consume({ + limit: 10, + }); + } catch (error) { + console.error(error); + } + ``` + +12. I am using a nested Avro schema and getting unknown errors. How can I debug them? + + If you have a nested Avro schema and you want to test it against your data, I created a small tool for it, called [nested-avro-schema](https://github.com/mostafa/nested-avro-schema). This tool will help you to find discrepancies and errors in your schema data, so that you can fix them before you run [xk6-kafka](https://github.com/mostafa/xk6-kafka) tests. Refer to [this comment](https://github.com/mostafa/xk6-kafka/issues/266) for more information. + ## Contributions, Issues and Feedback I'd be thrilled to receive contributions and feedback on this project. You're always welcome to create an issue if you find one (or many). I would do my best to address the issues. Also, feel free to contribute by opening a PR with changes, and I'll do my best to review and merge it as soon as I can.