From 53a6a50bfba000e5fb8956b99ca48ce09282e2ce Mon Sep 17 00:00:00 2001 From: prasanthV <40450906+PrasanthV454@users.noreply.github.com> Date: Mon, 10 Jul 2023 22:41:16 +0530 Subject: [PATCH] Incremental alter configs implementation [KIP-339] (#4110) requires broker version >= 2.3.0 --------- Co-authored-by: Emanuele Sabellico --- CHANGELOG.md | 4 +- INTRODUCTION.md | 67 ++--- LICENSES.txt | 2 +- examples/.gitignore | 1 + examples/CMakeLists.txt | 23 ++ examples/Makefile | 5 + examples/README.md | 1 + examples/incremental_alter_configs.c | 348 ++++++++++++++++++++++++ src/rdkafka.c | 1 + src/rdkafka.h | 121 ++++++++- src/rdkafka_admin.c | 385 +++++++++++++++++++++++---- src/rdkafka_admin.h | 25 +- src/rdkafka_event.c | 11 + src/rdkafka_event.h | 1 + src/rdkafka_int.h | 1 + src/rdkafka_op.c | 147 +++++----- src/rdkafka_op.h | 26 +- src/rdkafka_request.c | 124 +++++++-- src/rdkafka_request.h | 10 + tests/0081-admin.c | 251 +++++++++++++++++ tests/test.c | 82 +++++- tests/test.h | 7 + 22 files changed, 1427 insertions(+), 216 deletions(-) create mode 100644 examples/incremental_alter_configs.c diff --git a/CHANGELOG.md b/CHANGELOG.md index b39a7249ae..95c4dbd0ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,8 +21,10 @@ librdkafka v2.2.0 is a feature release: closes as normal ones (#4294). * Added `fetch.queue.backoff.ms` to the consumer to control how long the consumer backs off next fetch attempt. (@bitemyapp, @edenhill, #2879) - * [KIP-235](https://cwiki.apache.org/confluence/display/KAFKA/KIP-235%3A+Add+DNS+alias+support+for+secured+connection): + * [KIP-235](https://cwiki.apache.org/confluence/display/KAFKA/KIP-235%3A+Add+DNS+alias+support+for+secured+connection): Add DNS alias support for secured connection (#4292). + * [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API): + IncrementalAlterConfigs API (started by @PrasanthV454, #4110). ## Enhancements diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 32d42bd1aa..d7b9a84a1c 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1909,7 +1909,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-302 - Use all addresses for resolved broker hostname | 2.1.0 | Supported | | KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Supported | | KIP-322 - DeleteTopics disabled error code | 2.1.0 | Supported | -| KIP-339 - AdminAPI: incrementalAlterConfigs | 2.3.0 | Not supported | +| KIP-339 - AdminAPI: incrementalAlterConfigs | 2.3.0 | Supported | | KIP-341 - Update Sticky partition assignment data | 2.3.0 | Not supported (superceeded by KIP-429) | | KIP-342 - Custom SASL OAUTHBEARER extensions | 2.1.0 | Supported | | KIP-345 - Consumer: Static membership | 2.4.0 | Supported | @@ -1964,42 +1964,43 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf ### Supported protocol versions -"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.3.1, while +"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.4.0, while "librdkafka max" is the maximum ApiVersion supported in the latest release of librdkafka. -| ApiKey | Request name | Kafka max | librdkafka max | -| ------- | ------------------- | ----------- | ----------------------- | -| 0 | Produce | 9 | 7 | -| 1 | Fetch | 13 | 11 | -| 2 | ListOffsets | 7 | 2 | -| 3 | Metadata | 12 | 9 | -| 8 | OffsetCommit | 8 | 7 | -| 9 | OffsetFetch | 8 | 7 | -| 10 | FindCoordinator | 4 | 2 | -| 11 | JoinGroup | 9 | 5 | -| 12 | Heartbeat | 4 | 3 | -| 13 | LeaveGroup | 5 | 1 | -| 14 | SyncGroup | 5 | 3 | -| 15 | DescribeGroups | 5 | 4 | -| 16 | ListGroups | 4 | 4 | -| 17 | SaslHandshake | 1 | 1 | -| 18 | ApiVersions | 3 | 3 | -| 19 | CreateTopics | 7 | 4 | -| 20 | DeleteTopics | 6 | 1 | -| 21 | DeleteRecords | 2 | 1 | -| 22 | InitProducerId | 4 | 4 | -| 24 | AddPartitionsToTxn | 3 | 0 | -| 25 | AddOffsetsToTxn | 3 | 0 | -| 26 | EndTxn | 3 | 1 | -| 28 | TxnOffsetCommit | 3 | 3 | -| 32 | DescribeConfigs | 4 | 1 | -| 33 | AlterConfigs | 2 | 1 | -| 36 | SaslAuthenticate | 2 | 1 | -| 37 | CreatePartitions | 3 | 0 | -| 42 | DeleteGroups | 2 | 1 | -| 47 | OffsetDelete | 0 | 0 | +| ApiKey | Request name | Kafka max | librdkafka max | +| ------- | ------------------------| ----------- | ----------------------- | +| 0 | Produce | 9 | 7 | +| 1 | Fetch | 13 | 11 | +| 2 | ListOffsets | 7 | 2 | +| 3 | Metadata | 12 | 9 | +| 8 | OffsetCommit | 8 | 7 | +| 9 | OffsetFetch | 8 | 7 | +| 10 | FindCoordinator | 4 | 2 | +| 11 | JoinGroup | 9 | 5 | +| 12 | Heartbeat | 4 | 3 | +| 13 | LeaveGroup | 5 | 1 | +| 14 | SyncGroup | 5 | 3 | +| 15 | DescribeGroups | 5 | 4 | +| 16 | ListGroups | 4 | 4 | +| 17 | SaslHandshake | 1 | 1 | +| 18 | ApiVersions | 3 | 3 | +| 19 | CreateTopics | 7 | 4 | +| 20 | DeleteTopics | 6 | 1 | +| 21 | DeleteRecords | 2 | 1 | +| 22 | InitProducerId | 4 | 4 | +| 24 | AddPartitionsToTxn | 3 | 0 | +| 25 | AddOffsetsToTxn | 3 | 0 | +| 26 | EndTxn | 3 | 1 | +| 28 | TxnOffsetCommit | 3 | 3 | +| 32 | DescribeConfigs | 4 | 1 | +| 33 | AlterConfigs | 2 | 2 | +| 36 | SaslAuthenticate | 2 | 0 | +| 37 | CreatePartitions | 3 | 0 | +| 42 | DeleteGroups | 2 | 1 | +| 44 | IncrementalAlterConfigs | 1 | 1 | +| 47 | OffsetDelete | 0 | 0 | diff --git a/LICENSES.txt b/LICENSES.txt index d045048a5d..ed89214919 100644 --- a/LICENSES.txt +++ b/LICENSES.txt @@ -3,7 +3,7 @@ LICENSE librdkafka - Apache Kafka C driver library Copyright (c) 2012-2022, Magnus Edenhill - 2023 Confluent Inc. + 2023, Confluent Inc. All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/examples/.gitignore b/examples/.gitignore index 4190608c42..893f84179b 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -17,3 +17,4 @@ list_consumer_groups describe_consumer_groups list_consumer_group_offsets alter_consumer_group_offsets +incremental_alter_configs diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index bbbb89ad90..748abad572 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -26,6 +26,29 @@ target_link_libraries(openssl_engine_example_cpp PUBLIC rdkafka++) add_executable(misc misc.c ${win32_sources}) target_link_libraries(misc PUBLIC rdkafka) +add_executable(idempotent_producer idempotent_producer.c ${win32_sources}) +target_link_libraries(idempotent_producer PUBLIC rdkafka) + +add_executable(transactions transactions.c ${win32_sources}) +target_link_libraries(transactions PUBLIC rdkafka) + +add_executable(delete_records delete_records.c ${win32_sources}) +target_link_libraries(delete_records PUBLIC rdkafka) + +add_executable(list_consumer_groups list_consumer_groups.c ${win32_sources}) +target_link_libraries(list_consumer_groups PUBLIC rdkafka) + +add_executable(describe_consumer_groups describe_consumer_groups.c ${win32_sources}) +target_link_libraries(describe_consumer_groups PUBLIC rdkafka) + +add_executable(list_consumer_group_offsets list_consumer_group_offsets.c ${win32_sources}) +target_link_libraries(list_consumer_group_offsets PUBLIC rdkafka) + +add_executable(alter_consumer_group_offsets alter_consumer_group_offsets.c ${win32_sources}) +target_link_libraries(alter_consumer_group_offsets PUBLIC rdkafka) + +add_executable(incremental_alter_configs incremental_alter_configs.c ${win32_sources}) +target_link_libraries(incremental_alter_configs PUBLIC rdkafka) # The targets below has Unix include dirs and do not compile on Windows. if(NOT WIN32) diff --git a/examples/Makefile b/examples/Makefile index 15fba3c2af..d06e8fc04a 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -8,6 +8,7 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \ describe_consumer_groups \ list_consumer_group_offsets \ alter_consumer_group_offsets \ + incremental_alter_configs \ misc all: $(EXAMPLES) @@ -80,6 +81,10 @@ alter_consumer_group_offsets: ../src/librdkafka.a alter_consumer_group_offsets.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) +incremental_alter_configs: ../src/librdkafka.a incremental_alter_configs.c + $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ + ../src/librdkafka.a $(LIBS) + rdkafka_complex_consumer_example: ../src/librdkafka.a rdkafka_complex_consumer_example.c $(CC) $(CPPFLAGS) $(CFLAGS) rdkafka_complex_consumer_example.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) diff --git a/examples/README.md b/examples/README.md index 3caee3b861..34afac2157 100644 --- a/examples/README.md +++ b/examples/README.md @@ -36,3 +36,4 @@ For more complex uses, see: * [describe_consumer_groups.c](describe_consumer_groups.c) - Describe consumer groups. * [list_consumer_group_offsets.c](list_consumer_group_offsets.c) - List offsets of a consumer group. * [alter_consumer_group_offsets.c](alter_consumer_group_offsets.c) - Alter offsets of a consumer group. + * [incremental_alter_configs.c](incremental_alter_configs.c) - Incrementally alter resource configurations. diff --git a/examples/incremental_alter_configs.c b/examples/incremental_alter_configs.c new file mode 100644 index 0000000000..40a16cf842 --- /dev/null +++ b/examples/incremental_alter_configs.c @@ -0,0 +1,348 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * IncrementalAlterConfigs usage example. + */ + +#include +#include +#include +#include +#include + +#ifdef _WIN32 +#include "../win32/wingetopt.h" +#else +#include +#endif + + +/* Typical include path would be , but this program + * is builtin from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" + + +const char *argv0; + +static rd_kafka_queue_t *queue; /** Admin result queue. + * This is a global so we can + * yield in stop() */ +static volatile sig_atomic_t run = 1; + +/** + * @brief Signal termination of program + */ +static void stop(int sig) { + if (!run) { + fprintf(stderr, "%% Forced termination\n"); + exit(2); + } + run = 0; + rd_kafka_queue_yield(queue); +} + + +static void usage(const char *reason, ...) { + + fprintf(stderr, + "Incremental alter config usage examples\n" + "\n" + "Usage: %s " + " ...\n" + "\n" + "Options:\n" + " -b Bootstrap server list to connect to.\n" + " -X Set librdkafka configuration property.\n" + " See CONFIGURATION.md for full list.\n" + " -d Enable librdkafka debugging (%s).\n" + "\n", + argv0, rd_kafka_get_debug_contexts()); + + if (reason) { + va_list ap; + char reasonbuf[512]; + + va_start(ap, reason); + vsnprintf(reasonbuf, sizeof(reasonbuf), reason, ap); + va_end(ap); + + fprintf(stderr, "ERROR: %s\n", reasonbuf); + } + + exit(reason ? 1 : 0); +} + + +#define fatal(...) \ + do { \ + fprintf(stderr, "ERROR: "); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\n"); \ + exit(2); \ + } while (0) + + +/** + * @brief Set config property. Exit on failure. + */ +static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { + char errstr[512]; + + if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) + fatal("Failed to set %s=%s: %s", name, val, errstr); +} + + + +static void print_alter_configs_result( + FILE *fp, + const rd_kafka_IncrementalAlterConfigs_result_t *result, + const char *prefix) { + size_t i; + size_t config_cnt; + const rd_kafka_ConfigResource_t **configs = + rd_kafka_IncrementalAlterConfigs_result_resources(result, + &config_cnt); + + for (i = 0; i < config_cnt; i++) { + const rd_kafka_ConfigResource_t *config = configs[i]; + + const char *resname = rd_kafka_ConfigResource_name(config); + rd_kafka_ResourceType_t restype = + rd_kafka_ConfigResource_type(config); + rd_kafka_resp_err_t err = rd_kafka_ConfigResource_error(config); + + fprintf(fp, "%sResource type: %s name: %s error: %s: %s\n", + prefix, rd_kafka_ResourceType_name(restype), resname, + rd_kafka_err2str(err), + rd_kafka_ConfigResource_error_string(config)); + } +} + + +/** + * @brief Call rd_kafka_IncrementalAlterConfigs() with a list of + * configs to alter. + */ +static void +cmd_incremental_alter_configs(rd_kafka_conf_t *conf, int argc, char **argv) { + rd_kafka_t *rk; + char errstr[512]; + rd_kafka_AdminOptions_t *options; + rd_kafka_event_t *event = NULL; + rd_kafka_error_t *error; + int retval = 0; + const char *prefix = " "; + int i = 0; + int resources = 0; + int config_cnt; + rd_kafka_ResourceType_t prev_restype = RD_KAFKA_RESOURCE_UNKNOWN; + char *prev_resname = NULL; + rd_kafka_ConfigResource_t **configs; + + if (argc % 5 != 0) { + usage("Invalid number of arguments: %d", argc); + } + + config_cnt = argc / 5; + configs = calloc(config_cnt, sizeof(*configs)); + + for (i = 0; i < config_cnt; i++) { + char *restype_s = argv[i * 5]; + char *resname = argv[i * 5 + 1]; + char *alter_op_type_s = argv[i * 5 + 2]; + char *config_name = argv[i * 5 + 3]; + char *config_value = argv[i * 5 + 4]; + rd_kafka_ConfigResource_t *config; + rd_kafka_AlterConfigOpType_t op_type; + rd_kafka_ResourceType_t restype = + !strcmp(restype_s, "TOPIC") + ? RD_KAFKA_RESOURCE_TOPIC + : !strcmp(restype_s, "BROKER") + ? RD_KAFKA_RESOURCE_BROKER + : RD_KAFKA_RESOURCE_UNKNOWN; + + if (restype == RD_KAFKA_RESOURCE_UNKNOWN) { + usage("Invalid resource type: %s", restype_s); + } + + /* It's not necessary, but cleaner and more efficient to group + * incremental alterations for the same ConfigResource.*/ + if (restype != prev_restype || strcmp(resname, prev_resname)) { + configs[resources++] = + rd_kafka_ConfigResource_new(restype, resname); + } + + config = configs[resources - 1]; + prev_restype = restype; + prev_resname = resname; + + if (!strcmp(alter_op_type_s, "SET")) { + op_type = RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET; + } else if (!strcmp(alter_op_type_s, "APPEND")) { + op_type = RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND; + } else if (!strcmp(alter_op_type_s, "SUBTRACT")) { + op_type = RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT; + } else if (!strcmp(alter_op_type_s, "DELETE")) { + op_type = RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE; + } else { + usage("Invalid alter config operation: %s", + alter_op_type_s); + } + + error = rd_kafka_ConfigResource_add_incremental_config( + config, config_name, op_type, config_value); + + if (error) { + usage( + "Error setting incremental config alteration %s" + " at index %d: %s", + alter_op_type_s, i, rd_kafka_error_string(error)); + } + } + + /* + * Create consumer instance + * NOTE: rd_kafka_new() takes ownership of the conf object + * and the application must not reference it again after + * this call. + */ + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!rk) + fatal("Failed to create new consumer: %s", errstr); + + /* + * Incremental alter configs + */ + queue = rd_kafka_queue_new(rk); + + /* Signal handler for clean shutdown */ + signal(SIGINT, stop); + + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS); + + if (rd_kafka_AdminOptions_set_request_timeout( + options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) { + fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); + goto exit; + } + + rd_kafka_IncrementalAlterConfigs(rk, configs, resources, options, + queue); + + rd_kafka_ConfigResource_destroy_array(configs, resources); + free(configs); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by + * the request timeout set + * above (10s) */); + + if (!event) { + /* User hit Ctrl-C, + * see yield call in stop() signal handler */ + fprintf(stderr, "%% Cancelled by user\n"); + + } else if (rd_kafka_event_error(event)) { + rd_kafka_resp_err_t err = rd_kafka_event_error(event); + /* IncrementalAlterConfigs request failed */ + fprintf(stderr, "%% IncrementalAlterConfigs failed: %s: %s\n", + rd_kafka_err2str(err), + rd_kafka_event_error_string(event)); + goto exit; + + } else { + /* IncrementalAlterConfigs request succeeded, but individual + * configs may have errors. */ + const rd_kafka_IncrementalAlterConfigs_result_t *result = + rd_kafka_event_IncrementalAlterConfigs_result(event); + printf("IncrementalAlterConfigs results:\n"); + print_alter_configs_result(stdout, result, prefix); + } + + +exit: + if (event) + rd_kafka_event_destroy(event); + rd_kafka_AdminOptions_destroy(options); + rd_kafka_queue_destroy(queue); + /* Destroy the client instance */ + rd_kafka_destroy(rk); + + exit(retval); +} + +int main(int argc, char **argv) { + rd_kafka_conf_t *conf; /**< Client configuration object */ + int opt; + argv0 = argv[0]; + + /* + * Create Kafka client configuration place-holder + */ + conf = rd_kafka_conf_new(); + + + /* + * Parse common options + */ + while ((opt = getopt(argc, argv, "b:X:d:")) != -1) { + switch (opt) { + case 'b': + conf_set(conf, "bootstrap.servers", optarg); + break; + + case 'X': { + char *name = optarg, *val; + + if (!(val = strchr(name, '='))) + fatal("-X expects a name=value argument"); + + *val = '\0'; + val++; + + conf_set(conf, name, val); + break; + } + + case 'd': + conf_set(conf, "debug", optarg); + break; + + default: + usage("Unknown option %c", (char)opt); + } + } + + cmd_incremental_alter_configs(conf, argc - optind, &argv[optind]); + + return 0; +} diff --git a/src/rdkafka.c b/src/rdkafka.c index 4f37ecc974..4a8ec30dfb 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -3951,6 +3951,7 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, case RD_KAFKA_OP_DELETETOPICS: case RD_KAFKA_OP_CREATEPARTITIONS: case RD_KAFKA_OP_ALTERCONFIGS: + case RD_KAFKA_OP_INCREMENTALALTERCONFIGS: case RD_KAFKA_OP_DESCRIBECONFIGS: case RD_KAFKA_OP_DELETERECORDS: case RD_KAFKA_OP_DELETEGROUPS: diff --git a/src/rdkafka.h b/src/rdkafka.h index e53123d3cd..0e14b8d273 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -5367,7 +5367,8 @@ typedef int rd_kafka_event_type_t; #define RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT 0x8000 /** AlterConsumerGroupOffsets_result_t */ #define RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT 0x10000 - +/** IncrementalAlterConfigs_result_t */ +#define RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT 0x20000 /** * @returns the event type for the given event. @@ -5514,6 +5515,7 @@ int rd_kafka_event_error_is_fatal(rd_kafka_event_t *rkev); * - RD_KAFKA_EVENT_DESCRIBEACLS_RESULT * - RD_KAFKA_EVENT_DELETEACLS_RESULT * - RD_KAFKA_EVENT_ALTERCONFIGS_RESULT + * - RD_KAFKA_EVENT_INCREMENTAL_ALTERCONFIGS_RESULT * - RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT * - RD_KAFKA_EVENT_DELETEGROUPS_RESULT * - RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT @@ -5617,6 +5619,8 @@ typedef rd_kafka_event_t rd_kafka_DeleteAcls_result_t; typedef rd_kafka_event_t rd_kafka_CreatePartitions_result_t; /*! AlterConfigs result type */ typedef rd_kafka_event_t rd_kafka_AlterConfigs_result_t; +/*! IncrementalAlterConfigs result type */ +typedef rd_kafka_event_t rd_kafka_IncrementalAlterConfigs_result_t; /*! CreateTopics result type */ typedef rd_kafka_event_t rd_kafka_DescribeConfigs_result_t; /*! DeleteRecords result type */ @@ -5682,6 +5686,18 @@ rd_kafka_event_CreatePartitions_result(rd_kafka_event_t *rkev); RD_EXPORT const rd_kafka_AlterConfigs_result_t * rd_kafka_event_AlterConfigs_result(rd_kafka_event_t *rkev); +/** + * @brief Get IncrementalAlterConfigs result. + * + * @returns the result of a IncrementalAlterConfigs request, or NULL if event is + * of different type. + * + * Event types: + * RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT + */ +RD_EXPORT const rd_kafka_IncrementalAlterConfigs_result_t * +rd_kafka_event_IncrementalAlterConfigs_result(rd_kafka_event_t *rkev); + /** * @brief Get DescribeConfigs result. * @@ -6721,6 +6737,8 @@ typedef enum rd_kafka_admin_op_t { RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS, /** AlterConsumerGroupOffsets */ RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS, + /** IncrementalAlterConfigs */ + RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS, RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ } rd_kafka_admin_op_t; @@ -6856,6 +6874,8 @@ rd_kafka_AdminOptions_set_validate_only(rd_kafka_AdminOptions_t *options, * the following exceptions: * - AlterConfigs with a BROKER resource are sent to the broker id set * as the resource name. + * - IncrementalAlterConfigs with a BROKER resource are sent to the broker id + * set as the resource name. * - DescribeConfigs with a BROKER resource are sent to the broker id set * as the resource name. * @@ -7416,6 +7436,18 @@ typedef enum rd_kafka_ResourcePatternType_t { RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT, } rd_kafka_ResourcePatternType_t; +/** + * @enum rd_kafka_AlterConfigOpType_t + * @brief Incremental alter configs operations. + */ +typedef enum rd_kafka_AlterConfigOpType_t { + RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET = 0, + RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE = 1, + RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND = 2, + RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT = 3, + RD_KAFKA_ALTER_CONFIG_OP_TYPE__CNT, +} rd_kafka_AlterConfigOpType_t; + /** * @returns a string representation of the \p resource_pattern_type */ @@ -7481,6 +7513,31 @@ rd_kafka_ConfigResource_set_config(rd_kafka_ConfigResource_t *config, const char *value); +/** + * @brief Add the value of the configuration entry for a subsequent + * incremental alter config operation. APPEND and SUBTRACT are + * possible for list-type configuration entries only. + * + * @param config ConfigResource to add config property to. + * @param name Configuration name, depends on resource type. + * @param op_type Operation type, one of rd_kafka_AlterConfigOpType_t. + * @param value Configuration value, depends on resource type and \p name. + * Set to \c NULL, only with with op_type set to DELETE, + * to revert configuration value to default. + * + * @returns NULL on success, or an rd_kafka_error_t * + * with the corresponding error code and string. + * Error ownership belongs to the caller. + * Possible error codes: + * - RD_KAFKA_RESP_ERR__INVALID_ARG on invalid input. + */ +RD_EXPORT rd_kafka_error_t *rd_kafka_ConfigResource_add_incremental_config( + rd_kafka_ConfigResource_t *config, + const char *name, + rd_kafka_AlterConfigOpType_t op_type, + const char *value); + + /** * @brief Get an array of config entries from a ConfigResource object. * @@ -7546,6 +7603,8 @@ rd_kafka_ConfigResource_error_string(const rd_kafka_ConfigResource_t *config); * since these resource requests must be sent to the broker specified * in the resource. * + * @deprecated Use rd_kafka_IncrementalAlterConfigs(). + * */ RD_EXPORT void rd_kafka_AlterConfigs(rd_kafka_t *rk, @@ -7580,6 +7639,66 @@ rd_kafka_AlterConfigs_result_resources( +/* + * IncrementalAlterConfigs - alter cluster configuration incrementally. + * + */ + + +/** + * @brief Incrementally update the configuration for the specified resources. + * Updates are not transactional so they may succeed for some resources + * while fail for others. The configs for a particular resource are + * updated atomically, executing the corresponding incremental operations + * on the provided configurations. + * + * @remark Requires broker version >=2.3.0 + * + * @remark Multiple resources and resource types may be set, but at most one + * resource of type \c RD_KAFKA_RESOURCE_BROKER is allowed per call + * since these resource requests must be sent to the broker specified + * in the resource. Broker option will be ignored in this case. + * + * @param rk Client instance. + * @param configs Array of config entries to alter. + * @param config_cnt Number of elements in \p configs array. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + */ +RD_EXPORT +void rd_kafka_IncrementalAlterConfigs(rd_kafka_t *rk, + rd_kafka_ConfigResource_t **configs, + size_t config_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + + +/* + * IncrementalAlterConfigs result type and methods + */ + +/** + * @brief Get an array of resource results from a IncrementalAlterConfigs + * result. + * + * Use \c rd_kafka_ConfigResource_error() and + * \c rd_kafka_ConfigResource_error_string() to extract per-resource error + * results on the returned array elements. + * + * The returned object life-times are the same as the \p result object. + * + * @param result Result object to get resource results from. + * @param cntp is updated to the number of elements in the array. + * + * @returns an array of ConfigResource elements, or NULL if not available. + */ +RD_EXPORT const rd_kafka_ConfigResource_t ** +rd_kafka_IncrementalAlterConfigs_result_resources( + const rd_kafka_IncrementalAlterConfigs_result_t *result, + size_t *cntp); + + + /* * DescribeConfigs - retrieve cluster configuration. * diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 35f4f150d8..6c4419b3a2 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -529,7 +529,8 @@ rd_kafka_admin_result_ret_resources(const rd_kafka_op_t *rko, size_t *cntp) { rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_ALTERCONFIGS || - reqtype == RD_KAFKA_OP_DESCRIBECONFIGS); + reqtype == RD_KAFKA_OP_DESCRIBECONFIGS || + reqtype == RD_KAFKA_OP_INCREMENTALALTERCONFIGS); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_ConfigResource_t **) @@ -1523,20 +1524,6 @@ rd_kafka_AdminOptions_set_validate_only(rd_kafka_AdminOptions_t *options, errstr, errstr_size); } -rd_kafka_resp_err_t -rd_kafka_AdminOptions_set_incremental(rd_kafka_AdminOptions_t *options, - int true_or_false, - char *errstr, - size_t errstr_size) { - rd_snprintf(errstr, errstr_size, - "Incremental updates currently not supported, see KIP-248"); - return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; - - return rd_kafka_confval_set_type(&options->incremental, - RD_KAFKA_CONFVAL_INT, &true_or_false, - errstr, errstr_size); -} - rd_kafka_resp_err_t rd_kafka_AdminOptions_set_broker(rd_kafka_AdminOptions_t *options, int32_t broker_id, @@ -1636,20 +1623,14 @@ static void rd_kafka_AdminOptions_init(rd_kafka_t *rk, if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS || options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || - options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS) + options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS || + options->for_api == RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS) rd_kafka_confval_init_int(&options->validate_only, "validate_only", 0, 1, 0); else rd_kafka_confval_disable(&options->validate_only, "validate_only"); - if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || - options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS) - rd_kafka_confval_init_int(&options->incremental, "incremental", - 0, 1, 0); - else - rd_kafka_confval_disable(&options->incremental, "incremental"); - if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || options->for_api == RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS) rd_kafka_confval_init_int(&options->require_stable_offsets, @@ -1884,18 +1865,14 @@ rd_kafka_NewTopic_set_replica_assignment(rd_kafka_NewTopic_t *new_topic, * @brief Generic constructor of ConfigEntry which is also added to \p rl */ static rd_kafka_resp_err_t -rd_kafka_admin_add_config0(rd_list_t *rl, - const char *name, - const char *value, - rd_kafka_AlterOperation_t operation) { +rd_kafka_admin_add_config0(rd_list_t *rl, const char *name, const char *value) { rd_kafka_ConfigEntry_t *entry; if (!name) return RD_KAFKA_RESP_ERR__INVALID_ARG; - entry = rd_calloc(1, sizeof(*entry)); - entry->kv = rd_strtup_new(name, value); - entry->a.operation = operation; + entry = rd_calloc(1, sizeof(*entry)); + entry->kv = rd_strtup_new(name, value); rd_list_add(rl, entry); @@ -1903,11 +1880,36 @@ rd_kafka_admin_add_config0(rd_list_t *rl, } +/** + * @brief Generic constructor of ConfigEntry for Incremental Alter Operations + * which is also added to \p rl + */ +static rd_kafka_error_t * +rd_kafka_admin_incremental_add_config0(rd_list_t *rl, + const char *name, + rd_kafka_AlterConfigOpType_t op_type, + const char *value) { + rd_kafka_ConfigEntry_t *entry; + + if (!name) { + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, + "Config name is required"); + } + + entry = rd_calloc(1, sizeof(*entry)); + entry->kv = rd_strtup_new(name, value); + entry->a.op_type = op_type; + + rd_list_add(rl, entry); + + return NULL; +} + + rd_kafka_resp_err_t rd_kafka_NewTopic_set_config(rd_kafka_NewTopic_t *new_topic, const char *name, const char *value) { - return rd_kafka_admin_add_config0(&new_topic->config, name, value, - RD_KAFKA_ALTER_OP_ADD); + return rd_kafka_admin_add_config0(&new_topic->config, name, value); } @@ -2832,37 +2834,42 @@ rd_kafka_ConfigResource_add_ConfigEntry(rd_kafka_ConfigResource_t *config, rd_list_add(&config->config, entry); } - rd_kafka_resp_err_t -rd_kafka_ConfigResource_add_config(rd_kafka_ConfigResource_t *config, +rd_kafka_ConfigResource_set_config(rd_kafka_ConfigResource_t *config, const char *name, const char *value) { if (!name || !*name || !value) return RD_KAFKA_RESP_ERR__INVALID_ARG; - return rd_kafka_admin_add_config0(&config->config, name, value, - RD_KAFKA_ALTER_OP_ADD); + return rd_kafka_admin_add_config0(&config->config, name, value); } -rd_kafka_resp_err_t -rd_kafka_ConfigResource_set_config(rd_kafka_ConfigResource_t *config, - const char *name, - const char *value) { - if (!name || !*name || !value) - return RD_KAFKA_RESP_ERR__INVALID_ARG; - return rd_kafka_admin_add_config0(&config->config, name, value, - RD_KAFKA_ALTER_OP_SET); -} +rd_kafka_error_t *rd_kafka_ConfigResource_add_incremental_config( + rd_kafka_ConfigResource_t *config, + const char *name, + rd_kafka_AlterConfigOpType_t op_type, + const char *value) { + if (op_type < 0 || op_type >= RD_KAFKA_ALTER_CONFIG_OP_TYPE__CNT) { + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Invalid alter config operation type"); + } -rd_kafka_resp_err_t -rd_kafka_ConfigResource_delete_config(rd_kafka_ConfigResource_t *config, - const char *name) { - if (!name || !*name) - return RD_KAFKA_RESP_ERR__INVALID_ARG; + if (!name || !*name) { + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, + !name + ? "Config name is required" + : "Config name mustn't be empty"); + } - return rd_kafka_admin_add_config0(&config->config, name, NULL, - RD_KAFKA_ALTER_OP_DELETE); + if (op_type != RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE && !value) { + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, + "Config value is required"); + } + + return rd_kafka_admin_incremental_add_config0(&config->config, name, + op_type, value); } @@ -2996,7 +3003,7 @@ rd_kafka_AlterConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_buf_read_i32(reply, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); - rd_kafka_buf_read_i32(reply, &res_cnt); + rd_kafka_buf_read_arraycnt(reply, &res_cnt, RD_KAFKAP_CONFIGS_MAX); if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) { rd_snprintf(errstr, errstr_size, @@ -3029,6 +3036,7 @@ rd_kafka_AlterConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_buf_read_i8(reply, &res_type); rd_kafka_buf_read_str(reply, &kres_name); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); + rd_kafka_buf_skip_tags(reply); if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || @@ -3158,6 +3166,277 @@ const rd_kafka_ConfigResource_t **rd_kafka_AlterConfigs_result_resources( +/** + * @name IncrementalAlterConfigs + * @{ + * + * + * + */ + + + +/** + * @brief Parse IncrementalAlterConfigsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_op_t *rko_result = NULL; + int32_t res_cnt; + int i; + int32_t Throttle_Time; + + rd_kafka_buf_read_i32(reply, &Throttle_Time); + rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); + + rd_kafka_buf_read_arraycnt(reply, &res_cnt, RD_KAFKAP_CONFIGS_MAX); + + if (res_cnt != rd_list_cnt(&rko_req->rko_u.admin_request.args)) { + rd_snprintf(errstr, errstr_size, + "Received %" PRId32 + " ConfigResources in response " + "when %d were requested", + res_cnt, + rd_list_cnt(&rko_req->rko_u.admin_request.args)); + return RD_KAFKA_RESP_ERR__BAD_MSG; + } + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, + rd_kafka_ConfigResource_free); + + for (i = 0; i < (int)res_cnt; i++) { + int16_t error_code; + rd_kafkap_str_t error_msg; + int8_t res_type; + rd_kafkap_str_t kres_name; + char *res_name; + char *this_errstr = NULL; + rd_kafka_ConfigResource_t *config; + rd_kafka_ConfigResource_t skel; + int orig_pos; + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &error_msg); + rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_str(reply, &kres_name); + RD_KAFKAP_STR_DUPA(&res_name, &kres_name); + rd_kafka_buf_skip_tags(reply); + + if (error_code) { + if (RD_KAFKAP_STR_IS_NULL(&error_msg) || + RD_KAFKAP_STR_LEN(&error_msg) == 0) + this_errstr = + (char *)rd_kafka_err2str(error_code); + else + RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); + } + + config = rd_kafka_ConfigResource_new(res_type, res_name); + if (!config) { + rd_kafka_log(rko_req->rko_rk, LOG_ERR, "ADMIN", + "IncrementalAlterConfigs returned " + "unsupported ConfigResource #%d with " + "type %d and name \"%s\": ignoring", + i, res_type, res_name); + continue; + } + + config->err = error_code; + if (this_errstr) + config->errstr = rd_strdup(this_errstr); + + /* As a convenience to the application we insert result + * in the same order as they were requested. The broker + * does not maintain ordering unfortunately. */ + skel.restype = config->restype; + skel.name = config->name; + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, + &skel, rd_kafka_ConfigResource_cmp); + if (orig_pos == -1) { + rd_kafka_ConfigResource_destroy(config); + rd_kafka_buf_parse_fail( + reply, + "Broker returned ConfigResource %d,%s " + "that was not " + "included in the original request", + res_type, res_name); + } + + if (rd_list_elem(&rko_result->rko_u.admin_result.results, + orig_pos) != NULL) { + rd_kafka_ConfigResource_destroy(config); + rd_kafka_buf_parse_fail( + reply, + "Broker returned ConfigResource %d,%s " + "multiple times", + res_type, res_name); + } + + rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, + config); + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf( + errstr, errstr_size, + "IncrementalAlterConfigs response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +typedef RD_MAP_TYPE(const char *, const rd_bool_t *) map_str_bool; + + +void rd_kafka_IncrementalAlterConfigs(rd_kafka_t *rk, + rd_kafka_ConfigResource_t **configs, + size_t config_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + size_t i; + rd_kafka_resp_err_t err; + char errstr[256]; + rd_bool_t value = rd_true; + + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_IncrementalAlterConfigsRequest, + rd_kafka_IncrementalAlterConfigsResponse_parse, + }; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_INCREMENTALALTERCONFIGS, + RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT, &cbs, options, + rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt, + rd_kafka_ConfigResource_free); + + /* Check duplicate ConfigResource */ + map_str_bool configs_map = RD_MAP_INITIALIZER( + config_cnt, rd_map_str_cmp, rd_map_str_hash, NULL, NULL); + + for (i = 0; i < config_cnt; i++) { + /* 2 chars for the decimal restype + 1 for the comma + * + 1 for the trailing zero. */ + size_t len = 4 + strlen(configs[i]->name); + char *key = rd_alloca(len); + const rd_kafka_ConfigEntry_t **entries; + size_t entry_cnt, j; + + rd_snprintf(key, len - 1, "%d,%s", configs[i]->restype, + configs[i]->name); + if (RD_MAP_GET(&configs_map, key)) { + /* Duplicate ConfigResource found */ + break; + } + RD_MAP_SET(&configs_map, key, &value); + entries = + rd_kafka_ConfigResource_configs(configs[i], &entry_cnt); + + /* Check duplicate ConfigEntry */ + map_str_bool entries_map = RD_MAP_INITIALIZER( + entry_cnt, rd_map_str_cmp, rd_map_str_hash, NULL, NULL); + + for (j = 0; j < entry_cnt; j++) { + const rd_kafka_ConfigEntry_t *entry = entries[j]; + const char *key = rd_kafka_ConfigEntry_name(entry); + + if (RD_MAP_GET(&entries_map, key)) { + /* Duplicate ConfigEntry found */ + break; + } + RD_MAP_SET(&entries_map, key, &value); + } + RD_MAP_DESTROY(&entries_map); + + if (j != entry_cnt) { + RD_MAP_DESTROY(&configs_map); + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate ConfigEntry found"); + rd_kafka_admin_common_worker_destroy( + rk, rko, rd_true /*destroy*/); + return; + } + + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_ConfigResource_copy(configs[i])); + } + + RD_MAP_DESTROY(&configs_map); + + if (i != config_cnt) { + rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate ConfigResource found"); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + return; + } + + /* If there's a BROKER resource in the list we need to + * speak directly to that broker rather than the controller. + * + * Multiple BROKER resources are not allowed. + */ + err = rd_kafka_ConfigResource_get_single_broker_id( + &rko->rko_u.admin_request.args, &rko->rko_u.admin_request.broker_id, + errstr, sizeof(errstr)); + if (err) { + rd_kafka_admin_result_fail(rko, err, "%s", errstr); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + return; + } + if (rko->rko_u.admin_request.broker_id != + RD_KAFKA_ADMIN_TARGET_CONTROLLER) { + /* Revert broker option to default if altering + * broker configs. */ + err = rd_kafka_confval_set_type( + &rko->rko_u.admin_request.options.broker, + RD_KAFKA_CONFVAL_INT, NULL, errstr, sizeof(errstr)); + if (err) { + rd_kafka_admin_result_fail(rko, err, "%s", errstr); + rd_kafka_admin_common_worker_destroy( + rk, rko, rd_true /*destroy*/); + return; + } + } + + rd_kafka_q_enq(rk->rk_ops, rko); +} + + +const rd_kafka_ConfigResource_t ** +rd_kafka_IncrementalAlterConfigs_result_resources( + const rd_kafka_IncrementalAlterConfigs_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_resources( + (const rd_kafka_op_t *)result, cntp); +} + +/**@}*/ + + + /** * @name DescribeConfigs * @{ diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 3935c00c04..380f49dd0c 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -31,6 +31,7 @@ #include "rdstring.h" +#include "rdmap.h" #include "rdkafka_error.h" #include "rdkafka_confval.h" @@ -69,15 +70,9 @@ struct rd_kafka_AdminOptions_s { * CreateTopics * CreatePartitions * AlterConfigs + * IncrementalAlterConfigs */ - rd_kafka_confval_t incremental; /**< BOOL: Incremental rather than - * absolute application - * of config. - * Valid for: - * AlterConfigs - */ - rd_kafka_confval_t broker; /**< INT: Explicitly override * broker id to send * requests to. @@ -188,13 +183,6 @@ struct rd_kafka_NewPartitions_s { * @{ */ -/* KIP-248 */ -typedef enum rd_kafka_AlterOperation_t { - RD_KAFKA_ALTER_OP_ADD = 0, - RD_KAFKA_ALTER_OP_SET = 1, - RD_KAFKA_ALTER_OP_DELETE = 2, -} rd_kafka_AlterOperation_t; - struct rd_kafka_ConfigEntry_s { rd_strtup_t *kv; /**< Name/Value pair */ @@ -202,8 +190,9 @@ struct rd_kafka_ConfigEntry_s { /* Attributes: this is a struct for easy copying */ struct { - rd_kafka_AlterOperation_t operation; /**< Operation */ - rd_kafka_ConfigSource_t source; /**< Config source */ + /** Operation type, used for IncrementalAlterConfigs */ + rd_kafka_AlterConfigOpType_t op_type; + rd_kafka_ConfigSource_t source; /**< Config source */ rd_bool_t is_readonly; /**< Value is read-only (on broker) */ rd_bool_t is_default; /**< Value is at its default */ rd_bool_t is_sensitive; /**< Value is sensitive */ @@ -250,6 +239,10 @@ struct rd_kafka_AlterConfigs_result_s { rd_list_t resources; /**< Type (rd_kafka_ConfigResource_t *) */ }; +struct rd_kafka_IncrementalAlterConfigs_result_s { + rd_list_t resources; /**< Type (rd_kafka_ConfigResource_t *) */ +}; + struct rd_kafka_ConfigResource_result_s { rd_list_t resources; /**< Type (struct rd_kafka_ConfigResource *): * List of config resources, sans config diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index 58b0dc37b2..b2a6843ca2 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -60,6 +60,8 @@ const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) { return "CreatePartitionsResult"; case RD_KAFKA_EVENT_ALTERCONFIGS_RESULT: return "AlterConfigsResult"; + case RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT: + return "IncrementalAlterConfigsResult"; case RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT: return "DescribeConfigsResult"; case RD_KAFKA_EVENT_DELETERECORDS_RESULT: @@ -329,6 +331,15 @@ rd_kafka_event_AlterConfigs_result(rd_kafka_event_t *rkev) { return (const rd_kafka_AlterConfigs_result_t *)rkev; } +const rd_kafka_IncrementalAlterConfigs_result_t * +rd_kafka_event_IncrementalAlterConfigs_result(rd_kafka_event_t *rkev) { + if (!rkev || + rkev->rko_evtype != RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT) + return NULL; + else + return (const rd_kafka_IncrementalAlterConfigs_result_t *)rkev; +} + const rd_kafka_DescribeConfigs_result_t * rd_kafka_event_DescribeConfigs_result(rd_kafka_event_t *rkev) { diff --git a/src/rdkafka_event.h b/src/rdkafka_event.h index e5447f1467..52c2d191a2 100644 --- a/src/rdkafka_event.h +++ b/src/rdkafka_event.h @@ -98,6 +98,7 @@ static RD_UNUSED RD_INLINE int rd_kafka_event_setup(rd_kafka_t *rk, case RD_KAFKA_EVENT_DELETETOPICS_RESULT: case RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT: case RD_KAFKA_EVENT_ALTERCONFIGS_RESULT: + case RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT: case RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT: case RD_KAFKA_EVENT_DELETERECORDS_RESULT: case RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT: diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index f3554963ad..8a29c1f623 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -131,6 +131,7 @@ typedef struct rd_kafka_fetch_pos_s { #define RD_KAFKAP_TOPICS_MAX 1000000 #define RD_KAFKAP_PARTITIONS_MAX 100000 #define RD_KAFKAP_GROUPS_MAX 100000 +#define RD_KAFKAP_CONFIGS_MAX 10000 #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index a3ea9a39a6..32cf4b3623 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -44,42 +44,44 @@ rd_atomic32_t rd_kafka_op_cnt; const char *rd_kafka_op2str(rd_kafka_op_type_t type) { int skiplen = 6; static const char *names[RD_KAFKA_OP__END] = { - [RD_KAFKA_OP_NONE] = "REPLY:NONE", - [RD_KAFKA_OP_FETCH] = "REPLY:FETCH", - [RD_KAFKA_OP_ERR] = "REPLY:ERR", - [RD_KAFKA_OP_CONSUMER_ERR] = "REPLY:CONSUMER_ERR", - [RD_KAFKA_OP_DR] = "REPLY:DR", - [RD_KAFKA_OP_STATS] = "REPLY:STATS", - [RD_KAFKA_OP_OFFSET_COMMIT] = "REPLY:OFFSET_COMMIT", - [RD_KAFKA_OP_NODE_UPDATE] = "REPLY:NODE_UPDATE", - [RD_KAFKA_OP_XMIT_BUF] = "REPLY:XMIT_BUF", - [RD_KAFKA_OP_RECV_BUF] = "REPLY:RECV_BUF", - [RD_KAFKA_OP_XMIT_RETRY] = "REPLY:XMIT_RETRY", - [RD_KAFKA_OP_FETCH_START] = "REPLY:FETCH_START", - [RD_KAFKA_OP_FETCH_STOP] = "REPLY:FETCH_STOP", - [RD_KAFKA_OP_SEEK] = "REPLY:SEEK", - [RD_KAFKA_OP_PAUSE] = "REPLY:PAUSE", - [RD_KAFKA_OP_OFFSET_FETCH] = "REPLY:OFFSET_FETCH", - [RD_KAFKA_OP_PARTITION_JOIN] = "REPLY:PARTITION_JOIN", - [RD_KAFKA_OP_PARTITION_LEAVE] = "REPLY:PARTITION_LEAVE", - [RD_KAFKA_OP_REBALANCE] = "REPLY:REBALANCE", - [RD_KAFKA_OP_TERMINATE] = "REPLY:TERMINATE", - [RD_KAFKA_OP_COORD_QUERY] = "REPLY:COORD_QUERY", - [RD_KAFKA_OP_SUBSCRIBE] = "REPLY:SUBSCRIBE", - [RD_KAFKA_OP_ASSIGN] = "REPLY:ASSIGN", - [RD_KAFKA_OP_GET_SUBSCRIPTION] = "REPLY:GET_SUBSCRIPTION", - [RD_KAFKA_OP_GET_ASSIGNMENT] = "REPLY:GET_ASSIGNMENT", - [RD_KAFKA_OP_THROTTLE] = "REPLY:THROTTLE", - [RD_KAFKA_OP_NAME] = "REPLY:NAME", - [RD_KAFKA_OP_CG_METADATA] = "REPLY:CG_METADATA", - [RD_KAFKA_OP_OFFSET_RESET] = "REPLY:OFFSET_RESET", - [RD_KAFKA_OP_METADATA] = "REPLY:METADATA", - [RD_KAFKA_OP_LOG] = "REPLY:LOG", - [RD_KAFKA_OP_WAKEUP] = "REPLY:WAKEUP", - [RD_KAFKA_OP_CREATETOPICS] = "REPLY:CREATETOPICS", - [RD_KAFKA_OP_DELETETOPICS] = "REPLY:DELETETOPICS", - [RD_KAFKA_OP_CREATEPARTITIONS] = "REPLY:CREATEPARTITIONS", - [RD_KAFKA_OP_ALTERCONFIGS] = "REPLY:ALTERCONFIGS", + [RD_KAFKA_OP_NONE] = "REPLY:NONE", + [RD_KAFKA_OP_FETCH] = "REPLY:FETCH", + [RD_KAFKA_OP_ERR] = "REPLY:ERR", + [RD_KAFKA_OP_CONSUMER_ERR] = "REPLY:CONSUMER_ERR", + [RD_KAFKA_OP_DR] = "REPLY:DR", + [RD_KAFKA_OP_STATS] = "REPLY:STATS", + [RD_KAFKA_OP_OFFSET_COMMIT] = "REPLY:OFFSET_COMMIT", + [RD_KAFKA_OP_NODE_UPDATE] = "REPLY:NODE_UPDATE", + [RD_KAFKA_OP_XMIT_BUF] = "REPLY:XMIT_BUF", + [RD_KAFKA_OP_RECV_BUF] = "REPLY:RECV_BUF", + [RD_KAFKA_OP_XMIT_RETRY] = "REPLY:XMIT_RETRY", + [RD_KAFKA_OP_FETCH_START] = "REPLY:FETCH_START", + [RD_KAFKA_OP_FETCH_STOP] = "REPLY:FETCH_STOP", + [RD_KAFKA_OP_SEEK] = "REPLY:SEEK", + [RD_KAFKA_OP_PAUSE] = "REPLY:PAUSE", + [RD_KAFKA_OP_OFFSET_FETCH] = "REPLY:OFFSET_FETCH", + [RD_KAFKA_OP_PARTITION_JOIN] = "REPLY:PARTITION_JOIN", + [RD_KAFKA_OP_PARTITION_LEAVE] = "REPLY:PARTITION_LEAVE", + [RD_KAFKA_OP_REBALANCE] = "REPLY:REBALANCE", + [RD_KAFKA_OP_TERMINATE] = "REPLY:TERMINATE", + [RD_KAFKA_OP_COORD_QUERY] = "REPLY:COORD_QUERY", + [RD_KAFKA_OP_SUBSCRIBE] = "REPLY:SUBSCRIBE", + [RD_KAFKA_OP_ASSIGN] = "REPLY:ASSIGN", + [RD_KAFKA_OP_GET_SUBSCRIPTION] = "REPLY:GET_SUBSCRIPTION", + [RD_KAFKA_OP_GET_ASSIGNMENT] = "REPLY:GET_ASSIGNMENT", + [RD_KAFKA_OP_THROTTLE] = "REPLY:THROTTLE", + [RD_KAFKA_OP_NAME] = "REPLY:NAME", + [RD_KAFKA_OP_CG_METADATA] = "REPLY:CG_METADATA", + [RD_KAFKA_OP_OFFSET_RESET] = "REPLY:OFFSET_RESET", + [RD_KAFKA_OP_METADATA] = "REPLY:METADATA", + [RD_KAFKA_OP_LOG] = "REPLY:LOG", + [RD_KAFKA_OP_WAKEUP] = "REPLY:WAKEUP", + [RD_KAFKA_OP_CREATETOPICS] = "REPLY:CREATETOPICS", + [RD_KAFKA_OP_DELETETOPICS] = "REPLY:DELETETOPICS", + [RD_KAFKA_OP_CREATEPARTITIONS] = "REPLY:CREATEPARTITIONS", + [RD_KAFKA_OP_ALTERCONFIGS] = "REPLY:ALTERCONFIGS", + [RD_KAFKA_OP_INCREMENTALALTERCONFIGS] = + "REPLY:INCREMENTALALTERCONFIGS", [RD_KAFKA_OP_DESCRIBECONFIGS] = "REPLY:DESCRIBECONFIGS", [RD_KAFKA_OP_DELETERECORDS] = "REPLY:DELETERECORDS", [RD_KAFKA_OP_LISTCONSUMERGROUPS] = "REPLY:LISTCONSUMERGROUPS", @@ -195,41 +197,43 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { * if we forgot to add an op type to \ * this list. */ static const size_t op2size[RD_KAFKA_OP__END] = { - [RD_KAFKA_OP_FETCH] = sizeof(rko->rko_u.fetch), - [RD_KAFKA_OP_ERR] = sizeof(rko->rko_u.err), - [RD_KAFKA_OP_CONSUMER_ERR] = sizeof(rko->rko_u.err), - [RD_KAFKA_OP_DR] = sizeof(rko->rko_u.dr), - [RD_KAFKA_OP_STATS] = sizeof(rko->rko_u.stats), - [RD_KAFKA_OP_OFFSET_COMMIT] = sizeof(rko->rko_u.offset_commit), - [RD_KAFKA_OP_NODE_UPDATE] = sizeof(rko->rko_u.node), - [RD_KAFKA_OP_XMIT_BUF] = sizeof(rko->rko_u.xbuf), - [RD_KAFKA_OP_RECV_BUF] = sizeof(rko->rko_u.xbuf), - [RD_KAFKA_OP_XMIT_RETRY] = sizeof(rko->rko_u.xbuf), - [RD_KAFKA_OP_FETCH_START] = sizeof(rko->rko_u.fetch_start), - [RD_KAFKA_OP_FETCH_STOP] = _RD_KAFKA_OP_EMPTY, - [RD_KAFKA_OP_SEEK] = sizeof(rko->rko_u.fetch_start), - [RD_KAFKA_OP_PAUSE] = sizeof(rko->rko_u.pause), - [RD_KAFKA_OP_OFFSET_FETCH] = sizeof(rko->rko_u.offset_fetch), - [RD_KAFKA_OP_PARTITION_JOIN] = _RD_KAFKA_OP_EMPTY, - [RD_KAFKA_OP_PARTITION_LEAVE] = _RD_KAFKA_OP_EMPTY, - [RD_KAFKA_OP_REBALANCE] = sizeof(rko->rko_u.rebalance), - [RD_KAFKA_OP_TERMINATE] = _RD_KAFKA_OP_EMPTY, - [RD_KAFKA_OP_COORD_QUERY] = _RD_KAFKA_OP_EMPTY, - [RD_KAFKA_OP_SUBSCRIBE] = sizeof(rko->rko_u.subscribe), - [RD_KAFKA_OP_ASSIGN] = sizeof(rko->rko_u.assign), - [RD_KAFKA_OP_GET_SUBSCRIPTION] = sizeof(rko->rko_u.subscribe), - [RD_KAFKA_OP_GET_ASSIGNMENT] = sizeof(rko->rko_u.assign), - [RD_KAFKA_OP_THROTTLE] = sizeof(rko->rko_u.throttle), - [RD_KAFKA_OP_NAME] = sizeof(rko->rko_u.name), - [RD_KAFKA_OP_CG_METADATA] = sizeof(rko->rko_u.cg_metadata), - [RD_KAFKA_OP_OFFSET_RESET] = sizeof(rko->rko_u.offset_reset), - [RD_KAFKA_OP_METADATA] = sizeof(rko->rko_u.metadata), - [RD_KAFKA_OP_LOG] = sizeof(rko->rko_u.log), - [RD_KAFKA_OP_WAKEUP] = _RD_KAFKA_OP_EMPTY, - [RD_KAFKA_OP_CREATETOPICS] = sizeof(rko->rko_u.admin_request), - [RD_KAFKA_OP_DELETETOPICS] = sizeof(rko->rko_u.admin_request), - [RD_KAFKA_OP_CREATEPARTITIONS] = sizeof(rko->rko_u.admin_request), - [RD_KAFKA_OP_ALTERCONFIGS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_FETCH] = sizeof(rko->rko_u.fetch), + [RD_KAFKA_OP_ERR] = sizeof(rko->rko_u.err), + [RD_KAFKA_OP_CONSUMER_ERR] = sizeof(rko->rko_u.err), + [RD_KAFKA_OP_DR] = sizeof(rko->rko_u.dr), + [RD_KAFKA_OP_STATS] = sizeof(rko->rko_u.stats), + [RD_KAFKA_OP_OFFSET_COMMIT] = sizeof(rko->rko_u.offset_commit), + [RD_KAFKA_OP_NODE_UPDATE] = sizeof(rko->rko_u.node), + [RD_KAFKA_OP_XMIT_BUF] = sizeof(rko->rko_u.xbuf), + [RD_KAFKA_OP_RECV_BUF] = sizeof(rko->rko_u.xbuf), + [RD_KAFKA_OP_XMIT_RETRY] = sizeof(rko->rko_u.xbuf), + [RD_KAFKA_OP_FETCH_START] = sizeof(rko->rko_u.fetch_start), + [RD_KAFKA_OP_FETCH_STOP] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_SEEK] = sizeof(rko->rko_u.fetch_start), + [RD_KAFKA_OP_PAUSE] = sizeof(rko->rko_u.pause), + [RD_KAFKA_OP_OFFSET_FETCH] = sizeof(rko->rko_u.offset_fetch), + [RD_KAFKA_OP_PARTITION_JOIN] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_PARTITION_LEAVE] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_REBALANCE] = sizeof(rko->rko_u.rebalance), + [RD_KAFKA_OP_TERMINATE] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_COORD_QUERY] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_SUBSCRIBE] = sizeof(rko->rko_u.subscribe), + [RD_KAFKA_OP_ASSIGN] = sizeof(rko->rko_u.assign), + [RD_KAFKA_OP_GET_SUBSCRIPTION] = sizeof(rko->rko_u.subscribe), + [RD_KAFKA_OP_GET_ASSIGNMENT] = sizeof(rko->rko_u.assign), + [RD_KAFKA_OP_THROTTLE] = sizeof(rko->rko_u.throttle), + [RD_KAFKA_OP_NAME] = sizeof(rko->rko_u.name), + [RD_KAFKA_OP_CG_METADATA] = sizeof(rko->rko_u.cg_metadata), + [RD_KAFKA_OP_OFFSET_RESET] = sizeof(rko->rko_u.offset_reset), + [RD_KAFKA_OP_METADATA] = sizeof(rko->rko_u.metadata), + [RD_KAFKA_OP_LOG] = sizeof(rko->rko_u.log), + [RD_KAFKA_OP_WAKEUP] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_CREATETOPICS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_DELETETOPICS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_CREATEPARTITIONS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_ALTERCONFIGS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_INCREMENTALALTERCONFIGS] = + sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_DESCRIBECONFIGS] = sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_DELETERECORDS] = sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_LISTCONSUMERGROUPS] = sizeof(rko->rko_u.admin_request), @@ -392,6 +396,7 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { case RD_KAFKA_OP_DELETETOPICS: case RD_KAFKA_OP_CREATEPARTITIONS: case RD_KAFKA_OP_ALTERCONFIGS: + case RD_KAFKA_OP_INCREMENTALALTERCONFIGS: case RD_KAFKA_OP_DESCRIBECONFIGS: case RD_KAFKA_OP_DELETERECORDS: case RD_KAFKA_OP_LISTCONSUMERGROUPS: diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index f3df1df806..f9ccec2373 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -127,17 +127,20 @@ typedef enum { RD_KAFKA_OP_DELETETOPICS, /**< Admin: DeleteTopics: u.admin_request*/ RD_KAFKA_OP_CREATEPARTITIONS, /**< Admin: CreatePartitions: * u.admin_request*/ - RD_KAFKA_OP_ALTERCONFIGS, /**< Admin: AlterConfigs: u.admin_request*/ - RD_KAFKA_OP_DESCRIBECONFIGS, /**< Admin: DescribeConfigs: - * u.admin_request*/ - RD_KAFKA_OP_DELETERECORDS, /**< Admin: DeleteRecords: - * u.admin_request*/ - RD_KAFKA_OP_LISTCONSUMERGROUPS, /**< Admin: - * ListConsumerGroups - * u.admin_request */ - RD_KAFKA_OP_DESCRIBECONSUMERGROUPS, /**< Admin: - * DescribeConsumerGroups - * u.admin_request */ + RD_KAFKA_OP_ALTERCONFIGS, /**< Admin: AlterConfigs: u.admin_request*/ + RD_KAFKA_OP_INCREMENTALALTERCONFIGS, /**< Admin: + * IncrementalAlterConfigs: + * u.admin_request */ + RD_KAFKA_OP_DESCRIBECONFIGS, /**< Admin: DescribeConfigs: + * u.admin_request*/ + RD_KAFKA_OP_DELETERECORDS, /**< Admin: DeleteRecords: + * u.admin_request*/ + RD_KAFKA_OP_LISTCONSUMERGROUPS, /**< Admin: + * ListConsumerGroups + * u.admin_request */ + RD_KAFKA_OP_DESCRIBECONSUMERGROUPS, /**< Admin: + * DescribeConsumerGroups + * u.admin_request */ RD_KAFKA_OP_DELETEGROUPS, /**< Admin: DeleteGroups: u.admin_request*/ RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS, /**< Admin: * DeleteConsumerGroupOffsets @@ -521,6 +524,7 @@ struct rd_kafka_op_s { * * (rd_kafka_ConfigResource_t *): * AlterConfigs, DescribeConfigs + * IncrementalAlterConfigs */ void *opaque; /**< Application's opaque as set by diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 3b6f75b997..a2b6656de1 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -4214,7 +4214,7 @@ rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb, } ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_AlterConfigs, 0, 1, NULL); + rkb, RD_KAFKAP_AlterConfigs, 0, 2, NULL); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "AlterConfigs (KIP-133) not supported " @@ -4223,52 +4223,121 @@ rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - /* Incremental requires IncrementalAlterConfigs */ - if (rd_kafka_confval_get_int(&options->incremental)) { + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_AlterConfigs, 1, + rd_list_cnt(configs) * 200, + ApiVersion >= 2); + + /* #Resources */ + rd_kafka_buf_write_arraycnt(rkbuf, rd_list_cnt(configs)); + + RD_LIST_FOREACH(config, configs, i) { + const rd_kafka_ConfigEntry_t *entry; + int ei; + + /* ResourceType */ + rd_kafka_buf_write_i8(rkbuf, config->restype); + + /* ResourceName */ + rd_kafka_buf_write_str(rkbuf, config->name, -1); + + /* #Configs */ + rd_kafka_buf_write_arraycnt(rkbuf, + rd_list_cnt(&config->config)); + + RD_LIST_FOREACH(entry, &config->config, ei) { + /* Name */ + rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); + /* Value (nullable) */ + rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); + + rd_kafka_buf_write_tags(rkbuf); + } + + rd_kafka_buf_write_tags(rkbuf); + } + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + /* validate_only */ + rd_kafka_buf_write_i8( + rkbuf, rd_kafka_confval_get_int(&options->validate_only)); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +rd_kafka_resp_err_t rd_kafka_IncrementalAlterConfigsRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *configs /*(ConfigResource_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int i; + const rd_kafka_ConfigResource_t *config; + int op_timeout; + + if (rd_list_cnt(configs) == 0) { + rd_snprintf(errstr, errstr_size, + "No config resources specified"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_IncrementalAlterConfigs, 0, 1, NULL); + if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, - "AlterConfigs.incremental=true (KIP-248) " - "not supported by broker, " - "replaced by IncrementalAlterConfigs"); + "IncrementalAlterConfigs (KIP-339) not supported " + "by broker, requires broker version >= 2.3.0"); rd_kafka_replyq_destroy(&replyq); return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_AlterConfigs, 1, - rd_list_cnt(configs) * 200); + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_IncrementalAlterConfigs, 1, + rd_list_cnt(configs) * 200, ApiVersion >= 1); - /* #resources */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs)); + /* #Resources */ + rd_kafka_buf_write_arraycnt(rkbuf, rd_list_cnt(configs)); RD_LIST_FOREACH(config, configs, i) { const rd_kafka_ConfigEntry_t *entry; int ei; - /* resource_type */ + /* ResourceType */ rd_kafka_buf_write_i8(rkbuf, config->restype); - /* resource_name */ + /* ResourceName */ rd_kafka_buf_write_str(rkbuf, config->name, -1); - /* #config */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&config->config)); + /* #Configs */ + rd_kafka_buf_write_arraycnt(rkbuf, + rd_list_cnt(&config->config)); RD_LIST_FOREACH(entry, &config->config, ei) { - /* config_name */ + /* Name */ rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); - /* config_value (nullable) */ + /* ConfigOperation */ + rd_kafka_buf_write_i8(rkbuf, entry->a.op_type); + /* Value (nullable) */ rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); - if (entry->a.operation != RD_KAFKA_ALTER_OP_SET) { - rd_snprintf(errstr, errstr_size, - "IncrementalAlterConfigs required " - "for add/delete config " - "entries: only set supported " - "by this operation"); - rd_kafka_buf_destroy(rkbuf); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } + rd_kafka_buf_write_tags(rkbuf); } + + rd_kafka_buf_write_tags(rkbuf); } /* timeout */ @@ -4276,7 +4345,7 @@ rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb, if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - /* validate_only */ + /* ValidateOnly */ rd_kafka_buf_write_i8( rkbuf, rd_kafka_confval_get_int(&options->validate_only)); @@ -4287,7 +4356,6 @@ rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR_NO_ERROR; } - /** * @brief Construct and send DescribeConfigsRequest to \p rkb * with the configs (ConfigResource_t*) in \p configs, using diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 79254099cb..6f08e7a8a6 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -341,6 +341,16 @@ rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb, rd_kafka_resp_cb_t *resp_cb, void *opaque); +rd_kafka_resp_err_t rd_kafka_IncrementalAlterConfigsRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *configs /*(ConfigResource_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque); + rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( rd_kafka_broker_t *rkb, const rd_list_t *configs /*(ConfigResource_t*)*/, diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 285b8c0f65..7d8799ea23 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -897,6 +897,252 @@ static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { SUB_TEST_PASS(); } +/** + * @brief Test IncrementalAlterConfigs + */ +static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, + rd_kafka_queue_t *rkqu) { +#define MY_CONFRES_CNT 3 + char *topics[MY_CONFRES_CNT]; + rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; + rd_kafka_AdminOptions_t *options; + rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT]; + rd_kafka_event_t *rkev; + rd_kafka_resp_err_t err; + rd_kafka_error_t *error; + const rd_kafka_IncrementalAlterConfigs_result_t *res; + const rd_kafka_ConfigResource_t **rconfigs; + size_t rconfig_cnt; + char errstr[128]; + const char *errstr2; + int ci = 0; + int i; + int fails = 0; + + SUB_TEST_QUICK(); + + /* + * Only create one topic, the others will be non-existent. + */ + for (i = 0; i < MY_CONFRES_CNT; i++) + rd_strdupa(&topics[i], test_mk_topic_name(__FUNCTION__, 1)); + + test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL); + + test_wait_topic_exists(rk, topics[0], 10000); + + + /** Test the test helper, for use in other tests. */ + do { + const char *broker_id = tsprintf("%d", avail_brokers[0]); + const char *confs_set_append[] = { + "compression.type", "SET", "lz4", + "cleanup.policy", "APPEND", "compact"}; + const char *confs_delete_subtract[] = { + "compression.type", "DELETE", "lz4", + "cleanup.policy", "SUBTRACT", "compact"}; + const char *confs_set_append_broker[] = { + "background.threads", "SET", "9", + "log.cleanup.policy", "APPEND", "compact"}; + const char *confs_delete_subtract_broker[] = { + "background.threads", "DELETE", "", + "log.cleanup.policy", "SUBTRACT", "compact"}; + + TEST_SAY("Testing test helper with SET and APPEND\n"); + test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, + topics[0], confs_set_append, + 2); + TEST_SAY("Testing test helper with SUBTRACT and DELETE\n"); + test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, + topics[0], + confs_delete_subtract, 2); + + TEST_SAY( + "Testing test helper with SET and APPEND with BROKER " + "resource type\n"); + test_IncrementalAlterConfigs_simple( + rk, RD_KAFKA_RESOURCE_BROKER, broker_id, + confs_set_append_broker, 2); + TEST_SAY( + "Testing test helper with SUBTRACT and DELETE with BROKER " + "resource type\n"); + test_IncrementalAlterConfigs_simple( + rk, RD_KAFKA_RESOURCE_BROKER, broker_id, + confs_delete_subtract_broker, 2); + TEST_SAY("End testing test helper\n"); + } while (0); + + /* + * ConfigResource #0: valid topic config + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); + + error = rd_kafka_ConfigResource_add_incremental_config( + configs[ci], "compression.type", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, + "gzip"); + TEST_ASSERT(!error, "%s", rd_kafka_error_string(error)); + + error = rd_kafka_ConfigResource_add_incremental_config( + configs[ci], "flush.ms", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, + "12345678"); + TEST_ASSERT(!error, "%s", rd_kafka_error_string(error)); + + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + ci++; + + + if (test_broker_version >= TEST_BRKVER(1, 1, 0, 0)) { + /* + * ConfigResource #1: valid broker config + */ + configs[ci] = rd_kafka_ConfigResource_new( + RD_KAFKA_RESOURCE_BROKER, + tsprintf("%" PRId32, avail_brokers[0])); + + error = rd_kafka_ConfigResource_add_incremental_config( + configs[ci], "sasl.kerberos.min.time.before.relogin", + RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, "58000"); + TEST_ASSERT(!error, "%s", rd_kafka_error_string(error)); + + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + ci++; + } else { + TEST_WARN( + "Skipping RESOURCE_BROKER test on unsupported " + "broker version\n"); + } + + /* + * ConfigResource #2: valid topic config, non-existent topic + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); + + error = rd_kafka_ConfigResource_add_incremental_config( + configs[ci], "compression.type", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, + "lz4"); + TEST_ASSERT(!error, "%s", rd_kafka_error_string(error)); + + error = rd_kafka_ConfigResource_add_incremental_config( + configs[ci], "offset.metadata.max.bytes", + RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, "12345"); + TEST_ASSERT(!error, "%s", rd_kafka_error_string(error)); + + if (test_broker_version >= TEST_BRKVER(2, 7, 0, 0)) + exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + else + exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN; + ci++; + + /* + * Timeout options + */ + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS); + err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr, + sizeof(errstr)); + TEST_ASSERT(!err, "%s", errstr); + + + /* + * Fire off request + */ + rd_kafka_IncrementalAlterConfigs(rk, configs, ci, options, rkqu); + + rd_kafka_AdminOptions_destroy(options); + + /* + * Wait for result + */ + rkev = test_wait_admin_result( + rkqu, RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT, 10000 + 1000); + + /* + * Extract result + */ + res = rd_kafka_event_IncrementalAlterConfigs_result(rkev); + TEST_ASSERT(res, "Expected AlterConfigs result, not %s", + rd_kafka_event_name(rkev)); + + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(!err, "Expected success, not %s: %s", + rd_kafka_err2name(err), errstr2); + + rconfigs = rd_kafka_IncrementalAlterConfigs_result_resources( + res, &rconfig_cnt); + TEST_ASSERT((int)rconfig_cnt == ci, + "Expected %d result resources, got %" PRIusz "\n", ci, + rconfig_cnt); + + /* + * Verify status per resource + */ + for (i = 0; i < (int)rconfig_cnt; i++) { + const rd_kafka_ConfigEntry_t **entries; + size_t entry_cnt; + + err = rd_kafka_ConfigResource_error(rconfigs[i]); + errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]); + + entries = + rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt); + + TEST_SAY( + "ConfigResource #%d: type %s (%d), \"%s\": " + "%" PRIusz " ConfigEntries, error %s (%s)\n", + i, + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(rconfigs[i])), + rd_kafka_ConfigResource_type(rconfigs[i]), + rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt, + rd_kafka_err2name(err), errstr2 ? errstr2 : ""); + + test_print_ConfigEntry_array(entries, entry_cnt, 1); + + if (rd_kafka_ConfigResource_type(rconfigs[i]) != + rd_kafka_ConfigResource_type(configs[i]) || + strcmp(rd_kafka_ConfigResource_name(rconfigs[i]), + rd_kafka_ConfigResource_name(configs[i]))) { + TEST_FAIL_LATER( + "ConfigResource #%d: " + "expected type %s name %s, " + "got type %s name %s", + i, + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(configs[i])), + rd_kafka_ConfigResource_name(configs[i]), + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(rconfigs[i])), + rd_kafka_ConfigResource_name(rconfigs[i])); + fails++; + continue; + } + + + if (err != exp_err[i]) { + TEST_FAIL_LATER( + "ConfigResource #%d: " + "expected %s (%d), got %s (%s)", + i, rd_kafka_err2name(exp_err[i]), exp_err[i], + rd_kafka_err2name(err), errstr2 ? errstr2 : ""); + fails++; + } + } + + TEST_ASSERT(!fails, "See %d previous failure(s)", fails); + + rd_kafka_event_destroy(rkev); + + rd_kafka_ConfigResource_destroy_array(configs, ci); + + TEST_LATER_CHECK(); +#undef MY_CONFRES_CNT + + SUB_TEST_PASS(); +} + /** @@ -3714,6 +3960,11 @@ static void do_test_apis(rd_kafka_type_t cltype) { /* AlterConfigs */ do_test_AlterConfigs(rk, mainq); + if (test_broker_version >= TEST_BRKVER(2, 3, 0, 0)) { + /* IncrementalAlterConfigs */ + do_test_IncrementalAlterConfigs(rk, mainq); + } + /* DescribeConfigs */ do_test_DescribeConfigs(rk, mainq); diff --git a/tests/test.c b/tests/test.c index 0027d28c0d..06ade264eb 100644 --- a/tests/test.c +++ b/tests/test.c @@ -5835,6 +5835,7 @@ rd_kafka_event_t *test_wait_admin_result(rd_kafka_queue_t *q, * * Supported APIs: * - AlterConfigs + * - IncrementalAlterConfigs * - CreatePartitions * - CreateTopics * - DeleteGroups @@ -5918,6 +5919,17 @@ rd_kafka_resp_err_t test_wait_topic_admin_result(rd_kafka_queue_t *q, cres = rd_kafka_AlterConfigs_result_resources(res, &cres_cnt); + } else if (evtype == RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT) { + const rd_kafka_IncrementalAlterConfigs_result_t *res; + + if (!(res = + rd_kafka_event_IncrementalAlterConfigs_result(rkev))) + TEST_FAIL( + "Expected a IncrementalAlterConfigs result, not %s", + rd_kafka_event_name(rkev)); + + cres = rd_kafka_IncrementalAlterConfigs_result_resources( + res, &cres_cnt); } else if (evtype == RD_KAFKA_EVENT_CREATEACLS_RESULT) { const rd_kafka_CreateAcls_result_t *res; @@ -6496,7 +6508,7 @@ rd_kafka_resp_err_t test_AlterConfigs_simple(rd_kafka_t *rk, size_t result_cnt; const rd_kafka_ConfigEntry_t **configents; size_t configent_cnt; - + config_cnt = config_cnt * 2; q = rd_kafka_queue_new(rk); @@ -6581,6 +6593,74 @@ rd_kafka_resp_err_t test_AlterConfigs_simple(rd_kafka_t *rk, return err; } +/** + * @brief Delta Incremental Alter configuration for the given resource, + * overwriting/setting the configs provided in \p configs. + * Existing configuration remains intact. + * + * @param configs 'const char *name, const char *op_type', const char *value' + * tuples + * @param config_cnt is the number of tuples in \p configs + */ +rd_kafka_resp_err_t +test_IncrementalAlterConfigs_simple(rd_kafka_t *rk, + rd_kafka_ResourceType_t restype, + const char *resname, + const char **configs, + size_t config_cnt) { + rd_kafka_queue_t *q; + rd_kafka_ConfigResource_t *confres; + size_t i; + rd_kafka_resp_err_t err; + rd_kafka_error_t *error; + + + TEST_SAY("Incrementally altering configuration for %d %s\n", restype, + resname); + + q = rd_kafka_queue_new(rk); + confres = rd_kafka_ConfigResource_new(restype, resname); + config_cnt = config_cnt * 3; + + /* Apply the configuration to change. */ + for (i = 0; i < config_cnt; i += 3) { + const char *confname = configs[i]; + const char *op_string = configs[i + 1]; + const char *confvalue = configs[i + 2]; + rd_kafka_AlterConfigOpType_t op_type = + RD_KAFKA_ALTER_CONFIG_OP_TYPE__CNT; + + if (!strcmp(op_string, "SET")) + op_type = RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET; + else if (!strcmp(op_string, "DELETE")) + op_type = RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE; + else if (!strcmp(op_string, "APPEND")) + op_type = RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND; + else if (!strcmp(op_string, "SUBTRACT")) + op_type = RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT; + else + TEST_FAIL("Unknown op type %s\n", op_string); + + error = rd_kafka_ConfigResource_add_incremental_config( + confres, confname, op_type, confvalue); + TEST_ASSERT(!error, + "Failed to set incremental %s config %s=%s on " + "local resource object", + op_string, confname, confvalue); + } + + rd_kafka_IncrementalAlterConfigs(rk, &confres, 1, NULL, q); + + rd_kafka_ConfigResource_destroy(confres); + + err = test_wait_topic_admin_result( + q, RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT, NULL, 15 * 1000); + + rd_kafka_queue_destroy(q); + + return err; +} + /** * @brief Topic Admin API helpers * diff --git a/tests/test.h b/tests/test.h index 596824f918..a1f5cc2cb6 100644 --- a/tests/test.h +++ b/tests/test.h @@ -801,6 +801,13 @@ rd_kafka_resp_err_t test_AlterConfigs_simple(rd_kafka_t *rk, const char **configs, size_t config_cnt); +rd_kafka_resp_err_t +test_IncrementalAlterConfigs_simple(rd_kafka_t *rk, + rd_kafka_ResourceType_t restype, + const char *resname, + const char **configs, + size_t config_cnt); + rd_kafka_resp_err_t test_DeleteGroups_simple(rd_kafka_t *rk, rd_kafka_queue_t *useq, char **groups,