diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a605ef1d..78326486a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ librdkafka v2.6.0 is a feature release: of given types. * Fix for permanent fetch errors when using a newer Fetch RPC version with an older inter broker protocol (#4806). + * [KIP-460](https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC) Admin Leader Election RPC (#4845) ## Fixes diff --git a/INTRODUCTION.md b/INTRODUCTION.md index cbe951607..b4d0c0e8b 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -2024,7 +2024,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-436 - Start time in stats | 2.3.0 | Supported | | KIP-447 - Producer scalability for EOS | 2.5.0 | Supported | | KIP-455 - AdminAPI: Replica assignment | 2.4.0 (WIP) | Not supported | -| KIP-460 - AdminAPI: electPreferredLeader | 2.4.0 | Not supported | +| KIP-460 - AdminAPI: electLeaders | 2.6.0 | Supported | | KIP-464 - AdminAPI: defaults for createTopics | 2.4.0 | Supported | | KIP-467 - Per-message (sort of) error codes in ProduceResponse | 2.4.0 | Supported | | KIP-480 - Sticky partitioner | 2.4.0 | Supported | @@ -2102,6 +2102,7 @@ release of librdkafka. | 36 | SaslAuthenticate | 2 | 1 | | 37 | CreatePartitions | 3 | 0 | | 42 | DeleteGroups | 2 | 1 | +| 43 | ElectLeaders | 2 | 2 | | 44 | IncrementalAlterConfigs | 1 | 1 | | 47 | OffsetDelete | 0 | 0 | | 50 | DescribeUserScramCredentials | 0 | 0 | diff --git a/examples/.gitignore b/examples/.gitignore index 9b2c65a2f..a2cd3d609 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -21,4 +21,5 @@ list_consumer_group_offsets alter_consumer_group_offsets incremental_alter_configs user_scram -list_offsets \ No newline at end of file +list_offsets +elect_leaders \ No newline at end of file diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 8c0079abe..91851d2cb 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -62,6 +62,9 @@ target_link_libraries(describe_cluster PUBLIC rdkafka) add_executable(list_offsets list_offsets.c ${win32_sources}) target_link_libraries(list_offsets PUBLIC rdkafka) +add_executable(elect_leaders elect_leaders.c ${win32_sources}) +target_link_libraries(elect_leaders PUBLIC rdkafka) + # The targets below has Unix include dirs and do not compile on Windows. if(NOT WIN32) add_executable(rdkafka_example rdkafka_example.c) diff --git a/examples/Makefile b/examples/Makefile index f76702d02..f8b651354 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -13,6 +13,7 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \ incremental_alter_configs \ user_scram \ list_offsets \ + elect_leaders \ misc all: $(EXAMPLES) @@ -153,6 +154,10 @@ list_offsets: ../src/librdkafka.a list_offsets.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) +elect_leaders: ../src/librdkafka.a elect_leaders.c + $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ + ../src/librdkafka.a $(LIBS) + misc: ../src/librdkafka.a misc.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) diff --git a/examples/elect_leaders.c b/examples/elect_leaders.c new file mode 100644 index 000000000..8e4e5ecd2 --- /dev/null +++ b/examples/elect_leaders.c @@ -0,0 +1,320 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2024, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SH THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Example utility that shows how to use Elect Leaders (AdminAPI) + * to trigger preffered or unclean elections for + * one or more topic partitions. + */ + +#include +#include +#include +#include + +#ifdef _WIN32 +#include "../win32/wingetopt.h" +#else +#include +#endif + +/* Typical include path would be , but this program + * is builtin from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" + +const char *argv0; + +static rd_kafka_queue_t *queue = NULL; /** Admin result queue. + * This is a global so we can + * yield in stop() */ +static volatile sig_atomic_t run = 1; + +/** + * @brief Signal termination of program + */ +static void stop(int sig) { + if (!run) { + fprintf(stderr, "%% Forced termination\n"); + exit(2); + } + run = 0; + if (queue) + rd_kafka_queue_yield(queue); +} + +static void usage(const char *reason, ...) { + + fprintf(stderr, + "Elect Leaders usage examples\n" + "\n" + "Usage: %s " + " ...\n" + "\n" + "Options:\n" + " -b Bootstrap server list to connect to.\n" + " -X Set librdkafka configuration property.\n" + " See CONFIGURATION.md for full list.\n" + " -d Enable librdkafka debugging (%s).\n" + "\n", + argv0, rd_kafka_get_debug_contexts()); + + if (reason) { + va_list ap; + char reasonbuf[512]; + + va_start(ap, reason); + vsnprintf(reasonbuf, sizeof(reasonbuf), reason, ap); + va_end(ap); + + fprintf(stderr, "ERROR: %s\n", reasonbuf); + } + + exit(reason ? 1 : 0); +} + + +#define fatal(...) \ + do { \ + fprintf(stderr, "ERROR: "); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\n"); \ + exit(2); \ + } while (0) + + +/** + * @brief Set config property. Exit on failure. + */ +static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { + char errstr[512]; + + if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) + fatal("Failed to set %s=%s: %s", name, val, errstr); +} + +static void +print_elect_leaders_result(const rd_kafka_ElectLeaders_result_t *result) { + const rd_kafka_topic_partition_result_t **results; + size_t results_cnt; + size_t i; + const rd_kafka_ElectLeadersResult_t *res; + + res = rd_kafka_ElectLeaders_result(result); + + results = rd_kafka_ElectLeadersResult_partitions(res, &results_cnt); + printf("ElectLeaders response has %zu partition(s):\n", results_cnt); + for (i = 0; i < results_cnt; i++) { + const rd_kafka_topic_partition_t *partition = + rd_kafka_topic_partition_result_partition(results[i]); + const rd_kafka_error_t *err = + rd_kafka_topic_partition_result_error(results[i]); + if (rd_kafka_error_code(err)) { + printf("%% ElectLeaders failed for %s [%" PRId32 + "] : %s\n", + partition->topic, partition->partition, + rd_kafka_error_string(err)); + } else { + printf("%% ElectLeaders succeeded for %s [%" PRId32 + "]\n", + partition->topic, partition->partition); + } + } +} + +/** + * @brief Parse an integer or fail. + */ +int64_t parse_int(const char *what, const char *str) { + char *end; + unsigned long n = strtoull(str, &end, 0); + + if (end != str + strlen(str)) { + fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n", + what, str); + exit(1); + } + + return (int64_t)n; +} + +static void cmd_elect_leaders(rd_kafka_conf_t *conf, int argc, char **argv) { + rd_kafka_t *rk; + char errstr[512]; + rd_kafka_AdminOptions_t *options; + rd_kafka_event_t *event = NULL; + rd_kafka_topic_partition_list_t *partitions = NULL; + rd_kafka_ElectionType_t election_type; + rd_kafka_ElectLeaders_t *elect_leaders; + int i; + int retval = 0; + + if ((argc - 1) % 2 != 0) { + usage("Invalid number of arguments"); + } + + election_type = parse_int("election_type", argv[0]); + + argc--; + argv++; + if (argc > 0) { + partitions = rd_kafka_topic_partition_list_new(argc / 2); + for (i = 0; i < argc; i += 2) { + rd_kafka_topic_partition_list_add( + partitions, argv[i], + parse_int("partition", argv[i + 1])); + } + } + + elect_leaders = rd_kafka_ElectLeaders_new(election_type, partitions); + + if (partitions) { + rd_kafka_topic_partition_list_destroy(partitions); + } + + /* + * Create consumer instance + * NOTE: rd_kafka_new() takes ownership of the conf object + * and the application must not reference it again after + * this call. + */ + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!rk) + fatal("Failed to create new consumer: %s", errstr); + + /* + * Elect Leaders + */ + queue = rd_kafka_queue_new(rk); + + /* Signal handler for clean shutdown */ + signal(SIGINT, stop); + + + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ELECTLEADERS); + + if (rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))) { + fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); + goto exit; + } + + if (rd_kafka_AdminOptions_set_operation_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))) { + fprintf(stderr, "%% Failed to set operation timeout: %s\n", + errstr); + goto exit; + } + + rd_kafka_ElectLeaders(rk, elect_leaders, options, queue); + + rd_kafka_ElectLeaders_destroy(elect_leaders); + rd_kafka_AdminOptions_destroy(options); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by + * the request timeout set + * above */); + + if (!event) { + /* User hit Ctrl-C, + * see yield call in stop() signal handler */ + fprintf(stderr, "%% Cancelled by user\n"); + + } else if (rd_kafka_event_error(event)) { + rd_kafka_resp_err_t err = rd_kafka_event_error(event); + /* ElectLeaders request failed */ + fprintf(stderr, "%% ElectLeaders failed[%" PRId32 "]: %s\n", + err, rd_kafka_event_error_string(event)); + retval = 1; + goto exit; + } else { + /* ElectLeaders request succeeded */ + const rd_kafka_ElectLeaders_result_t *result; + result = rd_kafka_event_ElectLeaders_result(event); + print_elect_leaders_result(result); + } + + +exit: + if (event) + rd_kafka_event_destroy(event); + + rd_kafka_queue_destroy(queue); + /* Destroy the client instance */ + rd_kafka_destroy(rk); + + exit(retval); +} + + +int main(int argc, char **argv) { + rd_kafka_conf_t *conf; /**< Client configuration object */ + int opt; + argv0 = argv[0]; + + /* + * Create Kafka client configuration place-holder + */ + conf = rd_kafka_conf_new(); + + /* + * Parse common options + */ + while ((opt = getopt(argc, argv, "b:X:d:")) != -1) { + switch (opt) { + case 'b': + conf_set(conf, "bootstrap.servers", optarg); + break; + + case 'X': { + char *name = optarg, *val; + + if (!(val = strchr(name, '='))) + fatal("-X expects a name=value argument"); + + *val = '\0'; + val++; + + conf_set(conf, name, val); + break; + } + + case 'd': + conf_set(conf, "debug", optarg); + break; + + default: + usage("Unknown option %c", (char)opt); + } + } + + cmd_elect_leaders(conf, argc - optind, &argv[optind]); + + return 0; +} diff --git a/src/rdkafka.h b/src/rdkafka.h index f875a88d1..de433b823 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -263,6 +263,8 @@ typedef struct rd_kafka_headers_s rd_kafka_headers_t; typedef struct rd_kafka_group_result_s rd_kafka_group_result_t; typedef struct rd_kafka_acl_result_s rd_kafka_acl_result_t; typedef struct rd_kafka_Uuid_s rd_kafka_Uuid_t; +typedef struct rd_kafka_topic_partition_result_s + rd_kafka_topic_partition_result_t; /* @endcond */ @@ -5551,6 +5553,8 @@ typedef int rd_kafka_event_type_t; #define RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT 0x200000 /** ListOffsets_result_t */ #define RD_KAFKA_EVENT_LISTOFFSETS_RESULT 0x400000 +/** ElectLeaders_result_t */ +#define RD_KAFKA_EVENT_ELECTLEADERS_RESULT 0x800000 /** * @returns the event type for the given event. @@ -5709,6 +5713,7 @@ int rd_kafka_event_error_is_fatal(rd_kafka_event_t *rkev); * - RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT * - RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT * - RD_KAFKA_EVENT_LISTOFFSETS_RESULT + * - RD_KAFKA_EVENT_ELECTLEADERS_RESULT */ RD_EXPORT void *rd_kafka_event_opaque(rd_kafka_event_t *rkev); @@ -5832,6 +5837,8 @@ typedef rd_kafka_event_t rd_kafka_DescribeUserScramCredentials_result_t; typedef rd_kafka_event_t rd_kafka_AlterUserScramCredentials_result_t; /*! ListOffsets result type */ typedef rd_kafka_event_t rd_kafka_ListOffsets_result_t; +/*! ElectLeaders result type */ +typedef rd_kafka_event_t rd_kafka_ElectLeaders_result_t; /** * @brief Get CreateTopics result. @@ -6104,6 +6111,21 @@ rd_kafka_event_DescribeUserScramCredentials_result(rd_kafka_event_t *rkev); RD_EXPORT const rd_kafka_AlterUserScramCredentials_result_t * rd_kafka_event_AlterUserScramCredentials_result(rd_kafka_event_t *rkev); +/** + * @brief Get ElectLeaders result. + * + * @returns the result of a ElectLeaders request, or NULL if + * event is of different type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p rkev object. + * + * Event types: + * RD_KAFKA_EVENT_ELECTLEADERS_RESULT + */ +RD_EXPORT const rd_kafka_ElectLeaders_result_t * +rd_kafka_event_ElectLeaders_result(rd_kafka_event_t *rkev); + /** * @brief Poll a queue for an event for max \p timeout_ms. * @@ -6940,6 +6962,30 @@ rd_kafka_group_result_name(const rd_kafka_group_result_t *groupres); RD_EXPORT const rd_kafka_topic_partition_list_t * rd_kafka_group_result_partitions(const rd_kafka_group_result_t *groupres); +/** + * @brief Topic Partition Result provides per-topic+partition operation result + * Consists of TopicPartition object and error object. + */ + +/** + * @returns the topic partition object from the topic partition result object. + * @remarks lifetime of the returned string is the same as the \p + * partition_result. + * The error object is set inside the topic partition object. For the + * detailed error information, use + * rd_kafka_topic_partition_result_error() + */ +RD_EXPORT const rd_kafka_topic_partition_t * +rd_kafka_topic_partition_result_partition( + const rd_kafka_topic_partition_result_t *partition_result); + +/** + * @returns the error object from the topic partition result object. + * @remarks lifetime of the returned string is the same as the \p + * partition_result. + */ +RD_EXPORT const rd_kafka_error_t *rd_kafka_topic_partition_result_error( + const rd_kafka_topic_partition_result_t *partition_result); /**@}*/ @@ -7016,6 +7062,7 @@ typedef enum rd_kafka_admin_op_t { RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */ RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */ RD_KAFKA_ADMIN_OP_LISTOFFSETS, /**< ListOffsets */ + RD_KAFKA_ADMIN_OP_ELECTLEADERS, /**< ElectLeaders */ RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ } rd_kafka_admin_op_t; @@ -9915,6 +9962,123 @@ RD_EXPORT void rd_kafka_DeleteAcls(rd_kafka_t *rk, /**@}*/ +/** + * @name Admin API - Elect Leaders + * @{ + * + * + * + */ + +/** + * @brief Represents elect leaders request. + */ +typedef struct rd_kafka_ElectLeaders_s rd_kafka_ElectLeaders_t; + +/** + * @enum rd_kafka_ElectionType_t + * @brief Apache Kafka Election Types + */ +typedef enum rd_kafka_ElectionType_t { + RD_KAFKA_ELECTION_TYPE_PREFERRED = 0, /**< Preferred Replica Election */ + RD_KAFKA_ELECTION_TYPE_UNCLEAN = 1, /**< Unclean Election */ +} rd_kafka_ElectionType_t; + +/** + * @brief Create a new rd_kafka_ElectLeaders_t object. This object is later + * passed to rd_kafka_ElectLeaders(). + * + * @param election_type The election type that needs to be performed, + * preferred or unclean. + * @param partitions The topic partitions for which the leader election + * needs to be performed. + * + * @returns a new allocated elect leaders object or returns NULL in case + * of invalid election_type. + * Use rd_kafka_ElectLeaders_destroy() to free object when done. + */ +RD_EXPORT rd_kafka_ElectLeaders_t * +rd_kafka_ElectLeaders_new(rd_kafka_ElectionType_t election_type, + rd_kafka_topic_partition_list_t *partitions); + +/** + * @brief Destroy and free a rd_kafka_ElectLeaders_t object previously created + * with rd_kafka_ElectLeaders_new() + * + * @param elect_leaders The rd_kafka_ElectLeaders_t object to be destroyed. + */ +RD_EXPORT void +rd_kafka_ElectLeaders_destroy(rd_kafka_ElectLeaders_t *elect_leaders); + +/** + * @brief Elect Leaders for the provided Topic Partitions + * according to the specified election type. + * + * @param rk Client instance. + * @param elect_leaders The elect leaders request containing + * election type and partitions information. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + * + * Supported admin options: + * - rd_kafka_AdminOptions_set_operation_timeout() - default 60 seconds. + * Controls how long the brokers will wait for records to be deleted. + * - rd_kafka_AdminOptions_set_request_timeout() - default socket.timeout.ms. + * Controls how long \c rdkafka will wait for the request to complete. + * + * @remark The result event type emitted on the supplied queue is of type + * \c RD_KAFKA_EVENT_ELECTLEADERS_RESULT + * @remark If we are passing partitions as NULL, then the broker + * will attempt leader election for all partitions, but the results + * will contain only partitions for which there was an election or + * resulted in an error. + */ +RD_EXPORT void rd_kafka_ElectLeaders(rd_kafka_t *rk, + rd_kafka_ElectLeaders_t *elect_leaders, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + +/** + * @brief A struct representing result of elect leaders admin operation. + */ +typedef struct rd_kafka_ElectLeadersResult_s rd_kafka_ElectLeadersResult_t; + +/** + * @brief Get the elect leaders result from the elect leaders result event. + * + * @param result The elect leaders result event. + * + * @returns the elect leaders result from the elect leaders result event. + */ +RD_EXPORT const rd_kafka_ElectLeadersResult_t * +rd_kafka_ElectLeaders_result(const rd_kafka_ElectLeaders_result_t *result); + +/** + * @brief Get the array of topic partition result objects from the + * elect leaders result event and populates the size of the + * array in \p cntp. + * + * @param result The elect leaders result. + * @param cntp The number of elements in the array. + * + * @returns the array of topic partition result objects from the + * elect leaders result event. + */ +RD_EXPORT const rd_kafka_topic_partition_result_t ** +rd_kafka_ElectLeadersResult_partitions( + const rd_kafka_ElectLeadersResult_t *result, + size_t *cntp); + +/** + * @brief Destroy and free a rd_kafka_ElectLeadersResult_t object. + * + * @param result The rd_kafka_ElectLeadersResult_t object to be destroyed. + */ +RD_EXPORT void +rd_kafka_ElectLeadersResult_destroy(rd_kafka_ElectLeadersResult_t *result); + +/**@}*/ + /** * @name Security APIs * @{ diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index c1169a088..c4642e805 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -1722,7 +1722,8 @@ static void rd_kafka_AdminOptions_init(rd_kafka_t *rk, options->for_api == RD_KAFKA_ADMIN_OP_DELETETOPICS || options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS || - options->for_api == RD_KAFKA_ADMIN_OP_LISTOFFSETS) + options->for_api == RD_KAFKA_ADMIN_OP_LISTOFFSETS || + options->for_api == RD_KAFKA_ADMIN_OP_ELECTLEADERS) rd_kafka_confval_init_int(&options->operation_timeout, "operation_timeout", -1, 3600 * 1000, rk->rk_conf.admin.request_timeout_ms); @@ -9126,3 +9127,284 @@ void rd_kafka_DescribeCluster(rd_kafka_t *rk, } /**@}*/ + +/** + * @name ElectLeaders + * @{ + * + * + * + * + */ + +/** + * @brief Creates a new rd_kafka_ElectLeaders_t object with the given + * \p election_type and \p partitions. + */ +rd_kafka_ElectLeaders_t * +rd_kafka_ElectLeaders_new(rd_kafka_ElectionType_t election_type, + rd_kafka_topic_partition_list_t *partitions) { + + rd_kafka_ElectLeaders_t *elect_leaders; + + elect_leaders = rd_calloc(1, sizeof(*elect_leaders)); + if (partitions) + elect_leaders->partitions = + rd_kafka_topic_partition_list_copy(partitions); + elect_leaders->election_type = election_type; + + return elect_leaders; +} + +rd_kafka_ElectLeaders_t * +rd_kafka_ElectLeaders_copy(const rd_kafka_ElectLeaders_t *elect_leaders) { + return rd_kafka_ElectLeaders_new(elect_leaders->election_type, + elect_leaders->partitions); +} + +void rd_kafka_ElectLeaders_destroy(rd_kafka_ElectLeaders_t *elect_leaders) { + if (elect_leaders->partitions) + rd_kafka_topic_partition_list_destroy( + elect_leaders->partitions); + rd_free(elect_leaders); +} + +static void rd_kafka_ElectLeaders_free(void *ptr) { + rd_kafka_ElectLeaders_destroy(ptr); +} + +/** + * @brief Creates a new rd_kafka_ElectLeadersResult_t object with the given + * \p error and \p partitions. + */ +static rd_kafka_ElectLeadersResult_t * +rd_kafka_ElectLeadersResult_new(rd_list_t *partitions) { + + rd_kafka_ElectLeadersResult_t *result; + result = rd_calloc(1, sizeof(*result)); + rd_list_init_copy(&result->partitions, partitions); + rd_list_copy_to(&result->partitions, partitions, + rd_kafka_topic_partition_result_copy_opaque, NULL); + return result; +} + +const rd_kafka_ElectLeadersResult_t * +rd_kafka_ElectLeaders_result(const rd_kafka_ElectLeaders_result_t *result) { + return (const rd_kafka_ElectLeadersResult_t *)rd_list_elem( + &result->rko_u.admin_result.results, 0); +} + +const rd_kafka_topic_partition_result_t ** +rd_kafka_ElectLeadersResult_partitions( + const rd_kafka_ElectLeadersResult_t *result, + size_t *cntp) { + *cntp = rd_list_cnt(&result->partitions); + return (const rd_kafka_topic_partition_result_t **) + result->partitions.rl_elems; +} + +void rd_kafka_ElectLeadersResult_destroy( + rd_kafka_ElectLeadersResult_t *result) { + rd_list_destroy(&result->partitions); + rd_free(result); +} + +static void rd_kafka_ElectLeadersResult_free(void *ptr) { + rd_kafka_ElectLeadersResult_destroy(ptr); +} + +/** + * @brief Parse ElectLeadersResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_ElectLeadersResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_op_t *rko_result = NULL; + rd_kafka_ElectLeadersResult_t *result = NULL; + int16_t top_level_error_code = 0; + int32_t TopicArrayCnt; + int partition_cnt; + rd_list_t partitions_arr; + rd_kafka_ElectLeaders_t *request = + rko_req->rko_u.admin_request.args.rl_elems[0]; + int i; + int j; + + rd_kafka_buf_read_throttle_time(reply); + + if (rd_kafka_buf_ApiVersion(reply) >= 1) { + rd_kafka_buf_read_i16(reply, &top_level_error_code); + } + + if (top_level_error_code) { + rd_kafka_admin_result_fail( + rko_req, top_level_error_code, + "ElectLeaders request failed: %s", + rd_kafka_err2str(top_level_error_code)); + return top_level_error_code; + } + + /* #partitions */ + rd_kafka_buf_read_arraycnt(reply, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); + + if (request->partitions) + partition_cnt = request->partitions->cnt; + else + partition_cnt = 1; + rd_list_init(&partitions_arr, partition_cnt, + rd_kafka_topic_partition_result_free); + memset(partitions_arr.rl_elems, 0, + sizeof(*partitions_arr.rl_elems) * partition_cnt); + + for (i = 0; i < TopicArrayCnt; i++) { + rd_kafka_topic_partition_result_t *partition_result; + rd_kafkap_str_t ktopic; + char *topic; + int32_t PartArrayCnt; + + rd_kafka_buf_read_str(reply, &ktopic); + RD_KAFKAP_STR_DUPA(&topic, &ktopic); + + rd_kafka_buf_read_arraycnt(reply, &PartArrayCnt, + RD_KAFKAP_PARTITIONS_MAX); + + for (j = 0; j < PartArrayCnt; j++) { + int32_t partition; + int16_t partition_error_code; + rd_kafkap_str_t partition_error_msg; + char *partition_errstr; + int orig_pos; + + rd_kafka_buf_read_i32(reply, &partition); + rd_kafka_buf_read_i16(reply, &partition_error_code); + rd_kafka_buf_read_str(reply, &partition_error_msg); + + rd_kafka_buf_skip_tags(reply); + + if (RD_KAFKAP_STR_IS_NULL(&partition_error_msg) || + RD_KAFKAP_STR_LEN(&partition_error_msg) == 0) + partition_errstr = (char *)rd_kafka_err2str( + partition_error_code); + else + RD_KAFKAP_STR_DUPA(&partition_errstr, + &partition_error_msg); + + partition_result = rd_kafka_topic_partition_result_new( + topic, partition, partition_error_code, + partition_errstr); + + if (request->partitions) { + orig_pos = + rd_kafka_topic_partition_list_find_idx( + request->partitions, topic, partition); + + if (orig_pos == -1) { + rd_kafka_buf_parse_fail( + reply, + "Broker returned partition %s " + "[%" PRId32 + "] that was not " + "included in the original request", + topic, partition); + } + + if (rd_list_elem(&partitions_arr, orig_pos) != + NULL) { + rd_kafka_buf_parse_fail( + reply, + "Broker returned partition %s " + "[%" PRId32 "] multiple times", + topic, partition); + } + + rd_list_set(&partitions_arr, orig_pos, + partition_result); + } else { + rd_list_add(&partitions_arr, partition_result); + } + } + rd_kafka_buf_skip_tags(reply); + } + + rd_kafka_buf_skip_tags(reply); + + result = rd_kafka_ElectLeadersResult_new(&partitions_arr); + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, 1, + rd_kafka_ElectLeadersResult_free); + + rd_list_add(&rko_result->rko_u.admin_result.results, result); + + *rko_resultp = rko_result; + + rd_list_destroy(&partitions_arr); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +err_parse: + + rd_list_destroy(&partitions_arr); + + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "ElectLeaders response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +void rd_kafka_ElectLeaders(rd_kafka_t *rk, + rd_kafka_ElectLeaders_t *elect_leaders, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + rd_kafka_topic_partition_list_t *copied_partitions = NULL; + + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_ElectLeadersRequest, + rd_kafka_ElectLeadersResponse_parse, + }; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_ELECTLEADERS, + RD_KAFKA_EVENT_ELECTLEADERS_RESULT, + &cbs, options, rkqu->rkqu_q); + + if (elect_leaders->partitions) { + /* Duplicate topic partitions should not be present in the list + */ + copied_partitions = rd_kafka_topic_partition_list_copy( + elect_leaders->partitions); + if (rd_kafka_topic_partition_list_has_duplicates( + copied_partitions, rd_false /* check partition*/)) { + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate partitions specified"); + rd_kafka_admin_common_worker_destroy( + rk, rko, rd_true /*destroy*/); + rd_kafka_topic_partition_list_destroy( + copied_partitions); + return; + } + } + + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_ElectLeaders_free); + + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_ElectLeaders_copy(elect_leaders)); + + rd_kafka_q_enq(rk->rk_ops, rko); + if (copied_partitions) + rd_kafka_topic_partition_list_destroy(copied_partitions); +} + +/**@}*/ diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 7641645a3..608a74eee 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -593,4 +593,27 @@ typedef struct rd_kafka_ClusterDescription_s { /**@}*/ +/** + * @name ElectLeaders + * @{ + */ + +/** + * @struct ElectLeaders request object + */ +struct rd_kafka_ElectLeaders_s { + rd_kafka_ElectionType_t election_type; /*Election Type*/ + rd_kafka_topic_partition_list_t + *partitions; /*TopicPartitions for election*/ +}; + +/** + * @struct ElectLeaders result object + */ +struct rd_kafka_ElectLeadersResult_s { + rd_list_t partitions; /**< Type (rd_kafka_topic_partition_result_t *) */ +}; + +/**@}*/ + #endif /* _RDKAFKA_ADMIN_H_ */ diff --git a/src/rdkafka_aux.c b/src/rdkafka_aux.c index d327b6c8b..7d5ccb5b2 100644 --- a/src/rdkafka_aux.c +++ b/src/rdkafka_aux.c @@ -332,3 +332,78 @@ uint16_t rd_kafka_Node_port(const rd_kafka_Node_t *node) { const char *rd_kafka_Node_rack(const rd_kafka_Node_t *node) { return node->rack; } + +/** + * @brief Creates a new rd_kafka_topic_partition_result_t object. + */ + +rd_kafka_topic_partition_result_t * +rd_kafka_topic_partition_result_new(const char *topic, + int32_t partition, + rd_kafka_resp_err_t err, + const char *errstr) { + + rd_kafka_topic_partition_result_t *new_result; + + new_result = rd_calloc(1, sizeof(*new_result)); + new_result->topic_partition = + rd_kafka_topic_partition_new(topic, partition); + new_result->topic_partition->err = err; + new_result->error = rd_kafka_error_new(err, "%s", errstr); + + return new_result; +} + +const rd_kafka_topic_partition_t *rd_kafka_topic_partition_result_partition( + const rd_kafka_topic_partition_result_t *partition_result) { + return partition_result->topic_partition; +} + +const rd_kafka_error_t *rd_kafka_topic_partition_result_error( + const rd_kafka_topic_partition_result_t *partition_result) { + return partition_result->error; +} + +/** + * @brief Destroys the rd_kafka_topic_partition_result_t object. + */ +void rd_kafka_topic_partition_result_destroy( + rd_kafka_topic_partition_result_t *partition_result) { + rd_kafka_topic_partition_destroy(partition_result->topic_partition); + rd_kafka_error_destroy(partition_result->error); + rd_free(partition_result); +} + +/** + * @brief Destroys the array of rd_kafka_topic_partition_result_t objects. + */ +void rd_kafka_topic_partition_result_destroy_array( + rd_kafka_topic_partition_result_t **partition_results, + int32_t partition_results_cnt) { + int32_t i; + for (i = 0; i < partition_results_cnt; i++) { + rd_kafka_topic_partition_result_destroy(partition_results[i]); + } +} + +rd_kafka_topic_partition_result_t *rd_kafka_topic_partition_result_copy( + const rd_kafka_topic_partition_result_t *src) { + return rd_kafka_topic_partition_result_new( + src->topic_partition->topic, src->topic_partition->partition, + src->topic_partition->err, src->error->errstr); +} + +void *rd_kafka_topic_partition_result_copy_opaque(const void *src, + void *opaque) { + return rd_kafka_topic_partition_result_copy( + (const rd_kafka_topic_partition_result_t *)src); +} + +/** + * @brief Frees the memory allocated for a + * topic partition result object by calling + * its destroy function. + */ +void rd_kafka_topic_partition_result_free(void *ptr) { + rd_kafka_topic_partition_result_destroy(ptr); +} diff --git a/src/rdkafka_aux.h b/src/rdkafka_aux.h index fec88cb2a..340fcf708 100644 --- a/src/rdkafka_aux.h +++ b/src/rdkafka_aux.h @@ -128,4 +128,47 @@ void rd_kafka_Node_destroy(rd_kafka_Node_t *node); void rd_kafka_Node_free(void *node); +/** + * @brief Represents a topic partition result. + * + * @remark Public Type + */ +struct rd_kafka_topic_partition_result_s { + rd_kafka_topic_partition_t *topic_partition; + rd_kafka_error_t *error; +}; + +/** + * @brief Create a new rd_kafka_topic_partition_result_t object. + * + * @param topic The topic name. + * @param partition The partition number. + * @param err The error code. + * @param errstr The error string. + * + * @returns a newly allocated rd_kafka_topic_partition_result_t object. + * Use rd_kafka_topic_partition_result_destroy() to free object when + * done. + */ +rd_kafka_topic_partition_result_t * +rd_kafka_topic_partition_result_new(const char *topic, + int32_t partition, + rd_kafka_resp_err_t err, + const char *errstr); + +rd_kafka_topic_partition_result_t *rd_kafka_topic_partition_result_copy( + const rd_kafka_topic_partition_result_t *src); + +void *rd_kafka_topic_partition_result_copy_opaque(const void *src, + void *opaque); + +void rd_kafka_topic_partition_result_destroy( + rd_kafka_topic_partition_result_t *partition_result); + +void rd_kafka_topic_partition_result_destroy_array( + rd_kafka_topic_partition_result_t **partition_results, + int32_t partition_results_cnt); + +void rd_kafka_topic_partition_result_free(void *ptr); + #endif /* _RDKAFKA_AUX_H_ */ diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index 6ea366a5a..7e8cd200a 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -97,6 +97,8 @@ const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) { return "AlterUserScramCredentials"; case RD_KAFKA_EVENT_LISTOFFSETS_RESULT: return "ListOffsetsResult"; + case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: + return "ElectLeadersResult"; default: return "?unknown?"; } @@ -490,3 +492,11 @@ rd_kafka_event_ListConsumerGroupOffsets_result(rd_kafka_event_t *rkev) { else return (const rd_kafka_ListConsumerGroupOffsets_result_t *)rkev; } + +const rd_kafka_ElectLeaders_result_t * +rd_kafka_event_ElectLeaders_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_ELECTLEADERS_RESULT) + return NULL; + else + return (const rd_kafka_ElectLeaders_result_t *)rkev; +} diff --git a/src/rdkafka_event.h b/src/rdkafka_event.h index 5d22456b3..cf63e414e 100644 --- a/src/rdkafka_event.h +++ b/src/rdkafka_event.h @@ -117,6 +117,7 @@ static RD_UNUSED RD_INLINE int rd_kafka_event_setup(rd_kafka_t *rk, case RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT: case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: case RD_KAFKA_EVENT_LISTOFFSETS_RESULT: + case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: return 1; default: diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index dee5e25bb..60076e835 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -122,7 +122,7 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { "REPLY:RD_KAFKA_OP_SET_TELEMETRY_BROKER", [RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY", - + [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", }; if (type & RD_KAFKA_OP_REPLY) @@ -286,6 +286,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { [RD_KAFKA_OP_SET_TELEMETRY_BROKER] = sizeof(rko->rko_u.telemetry_broker), [RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_ELECTLEADERS] = sizeof(rko->rko_u.admin_request), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -439,6 +440,7 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { case RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS: case RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS: case RD_KAFKA_OP_LISTOFFSETS: + case RD_KAFKA_OP_ELECTLEADERS: rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq); rd_list_destroy(&rko->rko_u.admin_request.args); if (rko->rko_u.admin_request.options.match_consumer_group_states diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 1bf47b644..3af8a5f39 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -186,6 +186,9 @@ typedef enum { telemetry. */ RD_KAFKA_OP_TERMINATE_TELEMETRY, /**< Start termination sequence for telemetry. */ + RD_KAFKA_OP_ELECTLEADERS, /**< Admin: + * ElectLeaders + * u.admin_request */ RD_KAFKA_OP__END } rd_kafka_op_type_t; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index ec7169f50..8e43fd15e 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -5878,6 +5878,95 @@ rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR_NO_ERROR; } +/** + * @brief Construct and send ElectLeadersRequest to \p rkb + * with the partitions (ElectLeaders_t*) in \p elect_leaders, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t rd_kafka_ElectLeadersRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *elect_leaders /*(rd_kafka_EleactLeaders_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion; + const rd_kafka_ElectLeaders_t *elect_leaders_request; + int rd_buf_size_estimate; + int op_timeout; + + if (rd_list_cnt(elect_leaders) == 0) { + rd_snprintf(errstr, errstr_size, + "No partitions specified for leader election"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + elect_leaders_request = rd_list_elem(elect_leaders, 0); + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_ElectLeaders, 0, 2, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "ElectLeaders Admin API (KIP-460) not supported " + "by broker, requires broker version >= 2.4.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rd_buf_size_estimate = + 1 /* ElectionType */ + 4 /* #TopicPartitions */ + 4 /* TimeoutMs */; + if (elect_leaders_request->partitions) + rd_buf_size_estimate += + (50 + 4) * elect_leaders_request->partitions->cnt; + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_ElectLeaders, 1, + rd_buf_size_estimate, + ApiVersion >= 2); + + if (ApiVersion >= 1) { + /* Election type */ + rd_kafka_buf_write_i8(rkbuf, + elect_leaders_request->election_type); + } + + /* Write partition list */ + if (elect_leaders_request->partitions) { + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + rd_kafka_buf_write_topic_partitions( + rkbuf, elect_leaders_request->partitions, + rd_false /*don't skip invalid offsets*/, + rd_false /* any offset */, + rd_false /* don't use topic_id */, + rd_true /* use topic_names */, fields); + } else { + rd_kafka_buf_write_arraycnt(rkbuf, -1); + } + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + rd_kafka_buf_write_i32(rkbuf, op_timeout); + + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} /** * @brief Parses and handles an InitProducerId reply. * diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 626a8e55b..e534ec340 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -621,6 +621,16 @@ rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb, rd_kafka_resp_cb_t *resp_cb, void *opaque); +rd_kafka_resp_err_t rd_kafka_ElectLeadersRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *elect_leaders /*(rd_kafka_EleactLeaders_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque); + void rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers( rd_tmpabuf_t *tbuf, rd_kafkap_NodeEndpoints_t *NodeEndpoints); diff --git a/tests/0080-admin_ut.c b/tests/0080-admin_ut.c index 3a3b980f0..ad553f9a0 100644 --- a/tests/0080-admin_ut.c +++ b/tests/0080-admin_ut.c @@ -2355,6 +2355,137 @@ static void do_test_AlterUserScramCredentials(const char *what, SUB_TEST_PASS(); } +static void do_test_ElectLeaders(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int with_options, + rd_kafka_ElectionType_t election_type) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_event_t *rkev; + rd_kafka_resp_err_t err; + const rd_kafka_ElectLeaders_result_t *res; + rd_kafka_ElectLeaders_t *duplicate_elect_leaders; + rd_kafka_ElectLeaders_t *elect_leaders; + int exp_timeout = MY_SOCKET_TIMEOUT_MS; + test_timing_t timing; + rd_kafka_topic_partition_list_t *partitions; + char errstr[512]; + void *my_opaque = NULL, *opaque; + + SUB_TEST_QUICK("%s ElectLeaders with %s, timeout %dms", + rd_kafka_name(rk), what, exp_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + partitions = rd_kafka_topic_partition_list_new(3); + rd_kafka_topic_partition_list_add(partitions, "topic1", 9); + rd_kafka_topic_partition_list_add(partitions, "topic3", 15); + rd_kafka_topic_partition_list_add(partitions, "topic1", 1); + elect_leaders = rd_kafka_ElectLeaders_new(election_type, partitions); + rd_kafka_topic_partition_list_destroy(partitions); + + partitions = rd_kafka_topic_partition_list_new(3); + rd_kafka_topic_partition_list_add(partitions, "topic1", 9); + rd_kafka_topic_partition_list_add(partitions, "topic3", 15); + rd_kafka_topic_partition_list_add(partitions, "topic1", 9); + duplicate_elect_leaders = + rd_kafka_ElectLeaders_new(election_type, partitions); + rd_kafka_topic_partition_list_destroy(partitions); + + if (with_options) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_ELECTLEADERS); + + exp_timeout = MY_SOCKET_TIMEOUT_MS * 2; + + err = rd_kafka_AdminOptions_set_request_timeout( + options, exp_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + + if (useq) { + my_opaque = (void *)99981; + rd_kafka_AdminOptions_set_opaque(options, my_opaque); + } + } + + /*Duplicate topic-partition list*/ + TIMING_START(&timing, "ElectLeaders"); + TEST_SAY("Call ElectLeaders, timeout is %dms\n", exp_timeout); + rd_kafka_ElectLeaders(rk, duplicate_elect_leaders, options, q); + TIMING_ASSERT_LATER(&timing, 0, 10); + rd_kafka_ElectLeaders_destroy(duplicate_elect_leaders); + + /* Poll result queue */ + TIMING_START(&timing, "ElectLeaders.queue_poll"); + rkev = rd_kafka_queue_poll(q, exp_timeout + 1000); + TIMING_ASSERT(&timing, 0, exp_timeout + 100); + TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout); + TEST_SAY("ElectLeaders: got %s in %.3fs\n", rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + + /* Convert event to proper result */ + res = rd_kafka_event_ElectLeaders_result(rkev); + TEST_ASSERT(res, "expected ElectLeaders_result, not %s", + rd_kafka_event_name(rkev)); + /*Expecting error*/ + err = rd_kafka_event_error(rkev); + const char *event_errstr_duplicate = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err, "expected ElectLeaders to fail"); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, + "expected RD_KAFKA_RESP_ERR__INVALID_ARG, not %s", + rd_kafka_err2name(err)); + TEST_ASSERT(strcmp(event_errstr_duplicate, + "Duplicate partitions specified") == 0, + "expected \"Duplicate partitions specified\", not \"%s\"", + event_errstr_duplicate); + rd_kafka_event_destroy(rkev); + + /*Correct topic-partition list*/ + TIMING_START(&timing, "ElectLeaders"); + TEST_SAY("Call ElectLeaders, timeout is %dms\n", exp_timeout); + rd_kafka_ElectLeaders(rk, elect_leaders, options, q); + TIMING_ASSERT_LATER(&timing, 0, 10); + rd_kafka_ElectLeaders_destroy(elect_leaders); + + /* Poll result queue */ + TIMING_START(&timing, "ElectLeaders.queue_poll"); + rkev = rd_kafka_queue_poll(q, exp_timeout + 1000); + TIMING_ASSERT(&timing, exp_timeout - 100, exp_timeout + 100); + TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout); + TEST_SAY("ElectLeaders: got %s in %.3fs\n", rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + + /* Convert event to proper result */ + res = rd_kafka_event_ElectLeaders_result(rkev); + TEST_ASSERT(res, "expected ElectLeaders_result, not %s", + rd_kafka_event_name(rkev)); + opaque = rd_kafka_event_opaque(rkev); + TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p", + my_opaque, opaque); + /*Expecting error*/ + err = rd_kafka_event_error(rkev); + const char *event_err = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err, "expected ElectLeaders to fail"); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT, + "expected RD_KAFKA_RESP_ERR__TIMED_OUT, not %s", + rd_kafka_err2name(err)); + TEST_ASSERT(strcmp(event_err, + "Failed while waiting for controller: " + "Local: Timed out") == 0, + "expected \"Failed while waiting for controller: " + "Local: Timed out\", not \"%s\"", + event_err); + rd_kafka_event_destroy(rkev); + + if (options) + rd_kafka_AdminOptions_destroy(options); + if (!useq) + rd_kafka_queue_destroy(q); + + SUB_TEST_PASS(); +} + /** * @brief Test a mix of APIs using the same replyq. * @@ -2612,17 +2743,18 @@ static void do_test_options(rd_kafka_t *rk) { RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS, \ RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS, \ RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS, \ + RD_KAFKA_ADMIN_OP_ELECTLEADERS, \ RD_KAFKA_ADMIN_OP_ANY /* Must be last */ \ } struct { const char *setter; - const rd_kafka_admin_op_t valid_apis[16]; + const rd_kafka_admin_op_t valid_apis[17]; } matrix[] = { {"request_timeout", _all_apis}, {"operation_timeout", {RD_KAFKA_ADMIN_OP_CREATETOPICS, RD_KAFKA_ADMIN_OP_DELETETOPICS, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, - RD_KAFKA_ADMIN_OP_DELETERECORDS}}, + RD_KAFKA_ADMIN_OP_DELETERECORDS, RD_KAFKA_ADMIN_OP_ELECTLEADERS}}, {"validate_only", {RD_KAFKA_ADMIN_OP_CREATETOPICS, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, @@ -2876,6 +3008,23 @@ static void do_test_apis(rd_kafka_type_t cltype) { do_test_AlterUserScramCredentials("main queue", rk, mainq); do_test_AlterUserScramCredentials("temp queue", rk, NULL); + do_test_ElectLeaders("main queue, options, Preffered Elections", rk, + mainq, 1, RD_KAFKA_ELECTION_TYPE_PREFERRED); + do_test_ElectLeaders("main queue, options, Unclean Elections", rk, + mainq, 1, RD_KAFKA_ELECTION_TYPE_UNCLEAN); + do_test_ElectLeaders("main queue, no options, Preffered Elections", rk, + mainq, 0, RD_KAFKA_ELECTION_TYPE_PREFERRED); + do_test_ElectLeaders("main queue, no options, Unclean Elections", rk, + mainq, 0, RD_KAFKA_ELECTION_TYPE_UNCLEAN); + do_test_ElectLeaders("temp queue, options, Preffered Elections", rk, + NULL, 1, RD_KAFKA_ELECTION_TYPE_PREFERRED); + do_test_ElectLeaders("temp queue, options, Unclean Elections", rk, NULL, + 1, RD_KAFKA_ELECTION_TYPE_UNCLEAN); + do_test_ElectLeaders("temp queue, no options, Preffered Elections", rk, + NULL, 0, RD_KAFKA_ELECTION_TYPE_PREFERRED); + do_test_ElectLeaders("temp queue, no options, Unclean Elections", rk, + NULL, 0, RD_KAFKA_ELECTION_TYPE_UNCLEAN); + do_test_mix(rk, mainq); do_test_configs(rk, mainq);