From c23adb959192aadfcf7461371bb9f8dfe79958d2 Mon Sep 17 00:00:00 2001 From: mahajanadhitya <115617755+mahajanadhitya@users.noreply.github.com> Date: Mon, 10 Jul 2023 23:34:51 +0530 Subject: [PATCH] Scram Config API in Admin Client [KIP-554] (#4241) requires broker version >= 2.7.0 --------- Co-authored-by: Emanuele Sabellico --- CHANGELOG.md | 1 + INTRODUCTION.md | 68 +-- examples/.gitignore | 1 + examples/CMakeLists.txt | 3 + examples/Makefile | 5 + examples/README.md | 1 + examples/user_scram.c | 492 ++++++++++++++++++++++ src/rdkafka.c | 5 +- src/rdkafka.h | 291 ++++++++++++- src/rdkafka_admin.c | 796 +++++++++++++++++++++++++++++++++++- src/rdkafka_admin.h | 11 +- src/rdkafka_buf.h | 71 ++-- src/rdkafka_cgrp.c | 8 +- src/rdkafka_event.c | 23 ++ src/rdkafka_event.h | 2 + src/rdkafka_mock_handlers.c | 13 +- src/rdkafka_msgset_reader.c | 8 +- src/rdkafka_op.c | 10 + src/rdkafka_op.h | 6 + src/rdkafka_partition.c | 2 - src/rdkafka_proto.h | 8 +- src/rdkafka_request.c | 2 +- src/rdkafka_request.h | 1 - src/rdkafka_sasl_scram.c | 51 +-- src/rdkafka_ssl.c | 53 +++ src/rdkafka_ssl.h | 7 + tests/0080-admin_ut.c | 140 +++++++ tests/0081-admin.c | 322 ++++++++++++++- 28 files changed, 2253 insertions(+), 148 deletions(-) create mode 100644 examples/user_scram.c diff --git a/CHANGELOG.md b/CHANGELOG.md index 95c4dbd0ff..e7577a989a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ librdkafka v2.2.0 is a feature release: Add DNS alias support for secured connection (#4292). * [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API): IncrementalAlterConfigs API (started by @PrasanthV454, #4110). + * [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241). ## Enhancements diff --git a/INTRODUCTION.md b/INTRODUCTION.md index d7b9a84a1c..7cb45a2f3d 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1945,6 +1945,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-526 - Reduce Producer Metadata Lookups for Large Number of Topics | 2.5.0 | Not supported | | KIP-533 - Add default API timeout to AdminClient | 2.5.0 | Not supported | | KIP-546 - Add Client Quota APIs to AdminClient | 2.6.0 | Not supported | +| KIP-554 - Add Broker-side SCRAM Config API | 2.7.0 | Supported | | KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported | | KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported | | KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported | @@ -1969,39 +1970,40 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf release of librdkafka. -| ApiKey | Request name | Kafka max | librdkafka max | -| ------- | ------------------------| ----------- | ----------------------- | -| 0 | Produce | 9 | 7 | -| 1 | Fetch | 13 | 11 | -| 2 | ListOffsets | 7 | 2 | -| 3 | Metadata | 12 | 9 | -| 8 | OffsetCommit | 8 | 7 | -| 9 | OffsetFetch | 8 | 7 | -| 10 | FindCoordinator | 4 | 2 | -| 11 | JoinGroup | 9 | 5 | -| 12 | Heartbeat | 4 | 3 | -| 13 | LeaveGroup | 5 | 1 | -| 14 | SyncGroup | 5 | 3 | -| 15 | DescribeGroups | 5 | 4 | -| 16 | ListGroups | 4 | 4 | -| 17 | SaslHandshake | 1 | 1 | -| 18 | ApiVersions | 3 | 3 | -| 19 | CreateTopics | 7 | 4 | -| 20 | DeleteTopics | 6 | 1 | -| 21 | DeleteRecords | 2 | 1 | -| 22 | InitProducerId | 4 | 4 | -| 24 | AddPartitionsToTxn | 3 | 0 | -| 25 | AddOffsetsToTxn | 3 | 0 | -| 26 | EndTxn | 3 | 1 | -| 28 | TxnOffsetCommit | 3 | 3 | -| 32 | DescribeConfigs | 4 | 1 | -| 33 | AlterConfigs | 2 | 2 | -| 36 | SaslAuthenticate | 2 | 0 | -| 37 | CreatePartitions | 3 | 0 | -| 42 | DeleteGroups | 2 | 1 | -| 44 | IncrementalAlterConfigs | 1 | 1 | -| 47 | OffsetDelete | 0 | 0 | - +| ApiKey | Request name | Kafka max | librdkafka max | +| ------- | ------------------------------| ----------- | ----------------------- | +| 0 | Produce | 9 | 7 | +| 1 | Fetch | 13 | 11 | +| 2 | ListOffsets | 7 | 2 | +| 3 | Metadata | 12 | 9 | +| 8 | OffsetCommit | 8 | 7 | +| 9 | OffsetFetch | 8 | 7 | +| 10 | FindCoordinator | 4 | 2 | +| 11 | JoinGroup | 9 | 5 | +| 12 | Heartbeat | 4 | 3 | +| 13 | LeaveGroup | 5 | 1 | +| 14 | SyncGroup | 5 | 3 | +| 15 | DescribeGroups | 5 | 4 | +| 16 | ListGroups | 4 | 4 | +| 17 | SaslHandshake | 1 | 1 | +| 18 | ApiVersions | 3 | 3 | +| 19 | CreateTopics | 7 | 4 | +| 20 | DeleteTopics | 6 | 1 | +| 21 | DeleteRecords | 2 | 1 | +| 22 | InitProducerId | 4 | 4 | +| 24 | AddPartitionsToTxn | 3 | 0 | +| 25 | AddOffsetsToTxn | 3 | 0 | +| 26 | EndTxn | 3 | 1 | +| 28 | TxnOffsetCommit | 3 | 3 | +| 32 | DescribeConfigs | 4 | 1 | +| 33 | AlterConfigs | 2 | 2 | +| 36 | SaslAuthenticate | 2 | 1 | +| 37 | CreatePartitions | 3 | 0 | +| 42 | DeleteGroups | 2 | 1 | +| 44 | IncrementalAlterConfigs | 1 | 1 | +| 47 | OffsetDelete | 0 | 0 | +| 50 | DescribeUserScramCredentials | 0 | 0 | +| 51 | AlterUserScramCredentials | 0 | 0 | # Recommendations for language binding developers diff --git a/examples/.gitignore b/examples/.gitignore index 893f84179b..4df12d6233 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -18,3 +18,4 @@ describe_consumer_groups list_consumer_group_offsets alter_consumer_group_offsets incremental_alter_configs +user_scram \ No newline at end of file diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 748abad572..b3f974424f 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -50,6 +50,9 @@ target_link_libraries(alter_consumer_group_offsets PUBLIC rdkafka) add_executable(incremental_alter_configs incremental_alter_configs.c ${win32_sources}) target_link_libraries(incremental_alter_configs PUBLIC rdkafka) +add_executable(user_scram user_scram.c ${win32_sources}) +target_link_libraries(user_scram PUBLIC rdkafka) + # The targets below has Unix include dirs and do not compile on Windows. if(NOT WIN32) add_executable(rdkafka_example rdkafka_example.c) diff --git a/examples/Makefile b/examples/Makefile index d06e8fc04a..add586de8c 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -9,6 +9,7 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \ list_consumer_group_offsets \ alter_consumer_group_offsets \ incremental_alter_configs \ + user_scram \ misc all: $(EXAMPLES) @@ -133,6 +134,10 @@ openssl_engine_example_cpp: ../src-cpp/librdkafka++.a ../src/librdkafka.a openss $(CXX) $(CPPFLAGS) $(CXXFLAGS) openssl_engine_example.cpp -o $@ $(LDFLAGS) \ ../src-cpp/librdkafka++.a ../src/librdkafka.a $(LIBS) +user_scram: ../src/librdkafka.a user_scram.c + $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ + ../src/librdkafka.a $(LIBS) + misc: ../src/librdkafka.a misc.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) diff --git a/examples/README.md b/examples/README.md index 34afac2157..32e93e6056 100644 --- a/examples/README.md +++ b/examples/README.md @@ -37,3 +37,4 @@ For more complex uses, see: * [list_consumer_group_offsets.c](list_consumer_group_offsets.c) - List offsets of a consumer group. * [alter_consumer_group_offsets.c](alter_consumer_group_offsets.c) - Alter offsets of a consumer group. * [incremental_alter_configs.c](incremental_alter_configs.c) - Incrementally alter resource configurations. + * [user_scram.c](user_scram.c) - Describe or alter user SCRAM credentials. diff --git a/examples/user_scram.c b/examples/user_scram.c new file mode 100644 index 0000000000..95d6809b40 --- /dev/null +++ b/examples/user_scram.c @@ -0,0 +1,492 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SH THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Example utility that shows how to use SCRAM APIs (AdminAPI) + * DescribeUserScramCredentials -> Describe user SCRAM credentials + * AlterUserScramCredentials -> Upsert or delete user SCRAM credentials + */ + +#include +#include +#include +#include +#include + +#ifdef _WIN32 +#include "../win32/wingetopt.h" +#else +#include +#endif + +/* Typical include path would be , but this program + * is builtin from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" + +const char *argv0; + +static rd_kafka_queue_t *queue; /** Admin result queue. + * This is a global so we can + * yield in stop() */ +static volatile sig_atomic_t run = 1; + +/** + * @brief Signal termination of program + */ +static void stop(int sig) { + if (!run) { + fprintf(stderr, "%% Forced termination\n"); + exit(2); + } + run = 0; + rd_kafka_queue_yield(queue); +} + + +static void usage(const char *reason, ...) { + fprintf(stderr, + "Describe/Alter user SCRAM credentials\n" + "\n" + "Usage: %s \n" + " DESCRIBE ... \n" + " UPSERT " + " ... \n" + " DELETE ... \n" + "\n" + "Options:\n" + " -b Bootstrap server list to connect to.\n" + " -X Set librdkafka configuration property.\n" + " See CONFIGURATION.md for full list.\n" + " -d Enable librdkafka debugging (%s).\n" + "\n", + argv0, rd_kafka_get_debug_contexts()); + + if (reason) { + va_list ap; + char reasonbuf[512]; + + va_start(ap, reason); + vsnprintf(reasonbuf, sizeof(reasonbuf), reason, ap); + va_end(ap); + + fprintf(stderr, "ERROR: %s\n", reasonbuf); + } + + exit(reason ? 1 : 0); +} + +#define fatal(...) \ + do { \ + fprintf(stderr, "ERROR: "); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\n"); \ + exit(2); \ + } while (0) + + +/** + * @brief Set config property. Exit on failure. + */ +static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { + char errstr[512]; + + if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) + fatal("Failed to set %s=%s: %s", name, val, errstr); +} + + +/** + * @brief Parse an integer or fail. + */ +int64_t parse_int(const char *what, const char *str) { + char *end; + unsigned long n = strtoull(str, &end, 0); + + if (end != str + strlen(str)) { + fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n", + what, str); + exit(1); + } + + return (int64_t)n; +} + +rd_kafka_ScramMechanism_t parse_mechanism(const char *arg) { + return !strcmp(arg, "SCRAM-SHA-256") + ? RD_KAFKA_SCRAM_MECHANISM_SHA_256 + : !strcmp(arg, "SCRAM-SHA-512") + ? RD_KAFKA_SCRAM_MECHANISM_SHA_512 + : RD_KAFKA_SCRAM_MECHANISM_UNKNOWN; +} + +static void print_descriptions( + const rd_kafka_UserScramCredentialsDescription_t **descriptions, + size_t description_cnt) { + size_t i; + printf("DescribeUserScramCredentials descriptions[%zu]\n", + description_cnt); + for (i = 0; i < description_cnt; i++) { + const rd_kafka_UserScramCredentialsDescription_t *description; + description = descriptions[i]; + const char *username; + const rd_kafka_error_t *error; + username = + rd_kafka_UserScramCredentialsDescription_user(description); + error = + rd_kafka_UserScramCredentialsDescription_error(description); + rd_kafka_resp_err_t err = rd_kafka_error_code(error); + printf(" Username: \"%s\" Error: \"%s\"\n", username, + rd_kafka_err2str(err)); + if (err) { + const char *errstr = rd_kafka_error_string(error); + printf(" ErrorMessage: \"%s\"\n", errstr); + } + size_t num_credentials = + rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count( + description); + size_t itr; + for (itr = 0; itr < num_credentials; itr++) { + const rd_kafka_ScramCredentialInfo_t *scram_credential = + rd_kafka_UserScramCredentialsDescription_scramcredentialinfo( + description, itr); + rd_kafka_ScramMechanism_t mechanism; + int32_t iterations; + mechanism = rd_kafka_ScramCredentialInfo_mechanism( + scram_credential); + iterations = rd_kafka_ScramCredentialInfo_iterations( + scram_credential); + switch (mechanism) { + case RD_KAFKA_SCRAM_MECHANISM_UNKNOWN: + printf( + " Mechanism is " + "UNKNOWN\n"); + break; + case RD_KAFKA_SCRAM_MECHANISM_SHA_256: + printf( + " Mechanism is " + "SCRAM-SHA-256\n"); + break; + case RD_KAFKA_SCRAM_MECHANISM_SHA_512: + printf( + " Mechanism is " + "SCRAM-SHA-512\n"); + break; + default: + printf( + " Mechanism does " + "not match enums\n"); + } + printf(" Iterations are %d\n", iterations); + } + } +} + +static void print_alteration_responses( + const rd_kafka_AlterUserScramCredentials_result_response_t **responses, + size_t responses_cnt) { + size_t i; + printf("AlterUserScramCredentials responses [%zu]:\n", responses_cnt); + for (i = 0; i < responses_cnt; i++) { + const rd_kafka_AlterUserScramCredentials_result_response_t + *response = responses[i]; + const char *username; + const rd_kafka_error_t *error; + username = + rd_kafka_AlterUserScramCredentials_result_response_user( + response); + error = + rd_kafka_AlterUserScramCredentials_result_response_error( + response); + rd_kafka_resp_err_t err = rd_kafka_error_code(error); + if (err) { + const char *errstr = rd_kafka_error_string(error); + printf(" Username: \"%s\", Error: \"%s\"\n", + username, rd_kafka_err2str(err)); + printf(" ErrorMessage: \"%s\"\n", errstr); + } else { + printf(" Username: \"%s\" Success\n", username); + } + } +} + +static void Describe(rd_kafka_t *rk, const char **users, size_t user_cnt) { + rd_kafka_event_t *event; + char errstr[512]; /* librdkafka API error reporting buffer */ + + rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS); + + if (rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))) { + fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); + return; + } + + /* NULL argument gives us all the users*/ + rd_kafka_DescribeUserScramCredentials(rk, users, user_cnt, options, + queue); + rd_kafka_AdminOptions_destroy(options); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); + if (!event) { + /* User hit Ctrl-C */ + fprintf(stderr, "%% Cancelled by user\n"); + + } else if (rd_kafka_event_error(event)) { + /* Request failed */ + fprintf(stderr, "%% DescribeUserScramCredentials failed: %s\n", + rd_kafka_event_error_string(event)); + + } else { + /* Request succeeded */ + const rd_kafka_DescribeUserScramCredentials_result_t *result; + const rd_kafka_UserScramCredentialsDescription_t **descriptions; + size_t description_cnt; + result = + rd_kafka_event_DescribeUserScramCredentials_result(event); + descriptions = + rd_kafka_DescribeUserScramCredentials_result_descriptions( + result, &description_cnt); + print_descriptions(descriptions, description_cnt); + } + rd_kafka_event_destroy(event); +} + +static void Alter(rd_kafka_t *rk, + rd_kafka_UserScramCredentialAlteration_t **alterations, + size_t alteration_cnt) { + rd_kafka_event_t *event; + char errstr[512]; /* librdkafka API error reporting buffer */ + + /* Set timeout (optional) */ + rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS); + + if (rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))) { + fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); + return; + } + + /* Call the AlterUserScramCredentials function*/ + rd_kafka_AlterUserScramCredentials(rk, alterations, alteration_cnt, + options, queue); + rd_kafka_AdminOptions_destroy(options); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); + if (!event) { + /* User hit Ctrl-C */ + fprintf(stderr, "%% Cancelled by user\n"); + + } else if (rd_kafka_event_error(event)) { + /* Request failed */ + fprintf(stderr, "%% AlterUserScramCredentials failed: %s\n", + rd_kafka_event_error_string(event)); + + } else { + /* Request succeeded */ + const rd_kafka_AlterUserScramCredentials_result_t *result = + rd_kafka_event_AlterUserScramCredentials_result(event); + const rd_kafka_AlterUserScramCredentials_result_response_t * + *responses; + size_t responses_cnt; + responses = rd_kafka_AlterUserScramCredentials_result_responses( + result, &responses_cnt); + + print_alteration_responses(responses, responses_cnt); + } + rd_kafka_event_destroy(event); +} + +static void cmd_user_scram(rd_kafka_conf_t *conf, int argc, const char **argv) { + char errstr[512]; /* librdkafka API error reporting buffer */ + rd_kafka_t *rk; /* Admin client instance */ + size_t i; + const int min_argc = 1; + const int args_rest = argc - min_argc; + + int is_describe = 0; + int is_upsert = 0; + int is_delete = 0; + + /* + * Argument validation + */ + int correct_argument_cnt = argc >= min_argc; + + if (!correct_argument_cnt) + usage("Wrong number of arguments"); + + is_describe = !strcmp(argv[0], "DESCRIBE"); + is_upsert = !strcmp(argv[0], "UPSERT"); + is_delete = !strcmp(argv[0], "DELETE"); + + correct_argument_cnt = is_describe || + (is_upsert && (args_rest % 5) == 0) || + (is_delete && (args_rest % 2) == 0) || 0; + + if (!correct_argument_cnt) + usage("Wrong number of arguments"); + + + /* + * Create an admin client, it can be created using any client type, + * so we choose producer since it requires no extra configuration + * and is more light-weight than the consumer. + * + * NOTE: rd_kafka_new() takes ownership of the conf object + * and the application must not reference it again after + * this call. + */ + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if (!rk) { + fprintf(stderr, "%% Failed to create new producer: %s\n", + errstr); + exit(1); + } + + /* The Admin API is completely asynchronous, results are emitted + * on the result queue that is passed to DeleteRecords() */ + queue = rd_kafka_queue_new(rk); + + /* Signal handler for clean shutdown */ + signal(SIGINT, stop); + + if (is_describe) { + + /* Describe the users */ + Describe(rk, &argv[min_argc], argc - min_argc); + + } else if (is_upsert) { + size_t upsert_cnt = args_rest / 5; + const char **upsert_args = &argv[min_argc]; + rd_kafka_UserScramCredentialAlteration_t **upserts = + calloc(upsert_cnt, sizeof(*upserts)); + for (i = 0; i < upsert_cnt; i++) { + const char **upsert_args_curr = &upsert_args[i * 5]; + size_t salt_size = 0; + const char *username = upsert_args_curr[0]; + rd_kafka_ScramMechanism_t mechanism = + parse_mechanism(upsert_args_curr[1]); + int iterations = + parse_int("iterations", upsert_args_curr[2]); + const char *password = upsert_args_curr[3]; + const char *salt = upsert_args_curr[4]; + + if (strlen(salt) == 0) + salt = NULL; + else + salt_size = strlen(salt); + + upserts[i] = rd_kafka_UserScramCredentialUpsertion_new( + username, mechanism, iterations, + (const unsigned char *)password, strlen(password), + (const unsigned char *)salt, salt_size); + } + Alter(rk, upserts, upsert_cnt); + rd_kafka_UserScramCredentialAlteration_destroy_array( + upserts, upsert_cnt); + free(upserts); + } else { + size_t deletion_cnt = args_rest / 2; + const char **delete_args = &argv[min_argc]; + rd_kafka_UserScramCredentialAlteration_t **deletions = + calloc(deletion_cnt, sizeof(*deletions)); + for (i = 0; i < deletion_cnt; i++) { + const char **delete_args_curr = &delete_args[i * 2]; + rd_kafka_ScramMechanism_t mechanism = + parse_mechanism(delete_args_curr[1]); + const char *username = delete_args_curr[0]; + + deletions[i] = rd_kafka_UserScramCredentialDeletion_new( + username, mechanism); + } + Alter(rk, deletions, deletion_cnt); + rd_kafka_UserScramCredentialAlteration_destroy_array( + deletions, deletion_cnt); + free(deletions); + } + + signal(SIGINT, SIG_DFL); + + /* Destroy queue */ + rd_kafka_queue_destroy(queue); + + + /* Destroy the producer instance */ + rd_kafka_destroy(rk); +} + +int main(int argc, char **argv) { + rd_kafka_conf_t *conf; /**< Client configuration object */ + int opt; + argv0 = argv[0]; + + /* + * Create Kafka client configuration place-holder + */ + conf = rd_kafka_conf_new(); + + + /* + * Parse common options + */ + while ((opt = getopt(argc, argv, "b:X:d:")) != -1) { + switch (opt) { + case 'b': + conf_set(conf, "bootstrap.servers", optarg); + break; + + case 'X': { + char *name = optarg, *val; + + if (!(val = strchr(name, '='))) + fatal("-X expects a name=value argument"); + + *val = '\0'; + val++; + + conf_set(conf, name, val); + break; + } + + case 'd': + conf_set(conf, "debug", optarg); + break; + + default: + usage("Unknown option %c", (char)opt); + } + } + + cmd_user_scram(conf, argc - optind, (const char **)&argv[optind]); + return 0; +} diff --git a/src/rdkafka.c b/src/rdkafka.c index 4a8ec30dfb..a353f7b46f 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -700,7 +700,6 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { _ERR_DESC(RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE, "Broker: Request principal deserialization failed during " "forwarding"), - _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)}; @@ -4694,8 +4693,8 @@ static void rd_kafka_DescribeGroups_resp_cb(rd_kafka_t *rk, rd_kafka_buf_read_str(reply, &MemberId); rd_kafka_buf_read_str(reply, &ClientId); rd_kafka_buf_read_str(reply, &ClientHost); - rd_kafka_buf_read_bytes(reply, &Meta); - rd_kafka_buf_read_bytes(reply, &Assignment); + rd_kafka_buf_read_kbytes(reply, &Meta); + rd_kafka_buf_read_kbytes(reply, &Assignment); mi->member_id = RD_KAFKAP_STR_DUP(&MemberId); mi->client_id = RD_KAFKAP_STR_DUP(&ClientId); diff --git a/src/rdkafka.h b/src/rdkafka.h index 0e14b8d273..2065e72533 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -909,7 +909,6 @@ typedef struct rd_kafka_topic_partition_s { * rd_kafka_t INSTANCES. */ } rd_kafka_topic_partition_t; - /** * @brief Destroy a rd_kafka_topic_partition_t. * @remark This must not be called for elements in a topic partition list. @@ -953,7 +952,6 @@ typedef struct rd_kafka_topic_partition_list_s { rd_kafka_topic_partition_t *elems; /**< Element array[] */ } rd_kafka_topic_partition_list_t; - /** * @brief Create a new list/vector Topic+Partition container. * @@ -971,7 +969,6 @@ typedef struct rd_kafka_topic_partition_list_s { RD_EXPORT rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new(int size); - /** * @brief Free all resources used by the list and the list itself. */ @@ -5369,6 +5366,10 @@ typedef int rd_kafka_event_type_t; #define RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT 0x10000 /** IncrementalAlterConfigs_result_t */ #define RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT 0x20000 +/** DescribeUserScramCredentials_result_t */ +#define RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT 0x40000 +/** AlterUserScramCredentials_result_t */ +#define RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT 0x80000 /** * @returns the event type for the given event. @@ -5637,6 +5638,10 @@ typedef rd_kafka_event_t rd_kafka_DeleteConsumerGroupOffsets_result_t; typedef rd_kafka_event_t rd_kafka_AlterConsumerGroupOffsets_result_t; /*! ListConsumerGroupOffsets result type */ typedef rd_kafka_event_t rd_kafka_ListConsumerGroupOffsets_result_t; +/*! DescribeUserScramCredentials result type */ +typedef rd_kafka_event_t rd_kafka_DescribeUserScramCredentials_result_t; +/*! AlterUserScramCredentials result type */ +typedef rd_kafka_event_t rd_kafka_AlterUserScramCredentials_result_t; /** * @brief Get CreateTopics result. @@ -5804,6 +5809,21 @@ rd_kafka_event_DescribeAcls_result(rd_kafka_event_t *rkev); RD_EXPORT const rd_kafka_DeleteAcls_result_t * rd_kafka_event_DeleteAcls_result(rd_kafka_event_t *rkev); +/** + * @brief Get ListConsumerGroupOffsets result. + * + * @returns the result of a ListConsumerGroupOffsets request, or NULL if + * event is of different type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p rkev object. + * + * Event types: + * RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT + */ +RD_EXPORT const rd_kafka_ListConsumerGroupOffsets_result_t * +rd_kafka_event_ListConsumerGroupOffsets_result(rd_kafka_event_t *rkev); + /** * @brief Get AlterConsumerGroupOffsets result. * @@ -5820,19 +5840,34 @@ RD_EXPORT const rd_kafka_AlterConsumerGroupOffsets_result_t * rd_kafka_event_AlterConsumerGroupOffsets_result(rd_kafka_event_t *rkev); /** - * @brief Get ListConsumerGroupOffsets result. + * @brief Get DescribeUserScramCredentials result. * - * @returns the result of a ListConsumerGroupOffsets request, or NULL if + * @returns the result of a DescribeUserScramCredentials request, or NULL if * event is of different type. * * @remark The lifetime of the returned memory is the same * as the lifetime of the \p rkev object. * * Event types: - * RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT + * RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT */ -RD_EXPORT const rd_kafka_ListConsumerGroupOffsets_result_t * -rd_kafka_event_ListConsumerGroupOffsets_result(rd_kafka_event_t *rkev); +RD_EXPORT const rd_kafka_DescribeUserScramCredentials_result_t * +rd_kafka_event_DescribeUserScramCredentials_result(rd_kafka_event_t *rkev); + +/** + * @brief Get AlterUserScramCredentials result. + * + * @returns the result of a AlterUserScramCredentials request, or NULL if + * event is of different type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p rkev object. + * + * Event types: + * RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT + */ +RD_EXPORT const rd_kafka_AlterUserScramCredentials_result_t * +rd_kafka_event_AlterUserScramCredentials_result(rd_kafka_event_t *rkev); /** * @brief Poll a queue for an event for max \p timeout_ms. @@ -6739,6 +6774,10 @@ typedef enum rd_kafka_admin_op_t { RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS, /** IncrementalAlterConfigs */ RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS, + /** DescribeUserScramCredentials */ + RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS, + /** AlterUserScramCredentials */ + RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS, RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ } rd_kafka_admin_op_t; @@ -8575,6 +8614,242 @@ rd_kafka_DeleteConsumerGroupOffsets_result_groups( /**@}*/ + +/** + * @name Admin API - User SCRAM credentials + * @{ + */ + +/** + * @enum rd_kafka_ScramMechanism_t + * @brief Apache Kafka ScramMechanism values. + */ +typedef enum rd_kafka_ScramMechanism_t { + RD_KAFKA_SCRAM_MECHANISM_UNKNOWN = 0, + RD_KAFKA_SCRAM_MECHANISM_SHA_256 = 1, + RD_KAFKA_SCRAM_MECHANISM_SHA_512 = 2, + RD_KAFKA_SCRAM_MECHANISM__CNT +} rd_kafka_ScramMechanism_t; + +/** + * @brief Scram credential info. + * Mechanism and iterations for a SASL/SCRAM + * credential associated with a user. + */ +typedef struct rd_kafka_ScramCredentialInfo_s rd_kafka_ScramCredentialInfo_t; + +/** + * @brief Returns the mechanism of a given ScramCredentialInfo. + */ +RD_EXPORT +rd_kafka_ScramMechanism_t rd_kafka_ScramCredentialInfo_mechanism( + const rd_kafka_ScramCredentialInfo_t *scram_credential_info); + +/** + * @brief Returns the iterations of a given ScramCredentialInfo. + */ +RD_EXPORT +int32_t rd_kafka_ScramCredentialInfo_iterations( + const rd_kafka_ScramCredentialInfo_t *scram_credential_info); + +/** + * @brief Representation of all SASL/SCRAM credentials associated + * with a user that can be retrieved, + * or an error indicating why credentials + * could not be retrieved. + */ +typedef struct rd_kafka_UserScramCredentialsDescription_s + rd_kafka_UserScramCredentialsDescription_t; + +/** + * @brief Returns the username of a UserScramCredentialsDescription. + */ +RD_EXPORT +const char *rd_kafka_UserScramCredentialsDescription_user( + const rd_kafka_UserScramCredentialsDescription_t *description); + +/** + * @brief Returns the error associated with a UserScramCredentialsDescription. + */ +RD_EXPORT +const rd_kafka_error_t *rd_kafka_UserScramCredentialsDescription_error( + const rd_kafka_UserScramCredentialsDescription_t *description); + +/** + * @brief Returns the count of ScramCredentialInfos of a + * UserScramCredentialsDescription. + */ +RD_EXPORT +size_t rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count( + const rd_kafka_UserScramCredentialsDescription_t *description); + +/** + * @brief Returns the ScramCredentialInfo at index idx of + * UserScramCredentialsDescription. + */ +RD_EXPORT +const rd_kafka_ScramCredentialInfo_t * +rd_kafka_UserScramCredentialsDescription_scramcredentialinfo( + const rd_kafka_UserScramCredentialsDescription_t *description, + size_t idx); + +/** + * @brief Get an array of descriptions from a DescribeUserScramCredentials + * result. + * + * The returned value life-time is the same as the \p result object. + * + * @param result Result to get descriptions from. + * @param cntp is updated to the number of elements in the array. + */ +RD_EXPORT +const rd_kafka_UserScramCredentialsDescription_t ** +rd_kafka_DescribeUserScramCredentials_result_descriptions( + const rd_kafka_DescribeUserScramCredentials_result_t *result, + size_t *cntp); + +/** + * @brief Describe SASL/SCRAM credentials. + * This operation is supported by brokers with version 2.7.0 or higher. + * + * @param rk Client instance. + * @param users The users for which credentials are to be described. + * All users' credentials are described if NULL. + * @param user_cnt Number of elements in \p users array. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + */ +RD_EXPORT +void rd_kafka_DescribeUserScramCredentials( + rd_kafka_t *rk, + const char **users, + size_t user_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + +/** + * @brief A request to alter a user's SASL/SCRAM credentials. + */ +typedef struct rd_kafka_UserScramCredentialAlteration_s + rd_kafka_UserScramCredentialAlteration_t; + +/** + * @brief Allocates a new UserScramCredentialUpsertion given its fields. + * If salt isn't given a 64 B salt is generated using OpenSSL + * RAND_bytes, if available. + * + * @param username The username (not empty). + * @param mechanism SASL/SCRAM mechanism. + * @param iterations SASL/SCRAM iterations. + * @param password Password bytes (not empty). + * @param password_size Size of \p password (greater than 0). + * @param salt Salt bytes (optional). + * @param salt_size Size of \p salt (optional). + * + * @return A newly created instance of rd_kafka_UserScramCredentialAlteration_t. + * Ownership belongs to the caller, use + * rd_kafka_UserScramCredentialAlteration_destroy to destroy. + */ +RD_EXPORT +rd_kafka_UserScramCredentialAlteration_t * +rd_kafka_UserScramCredentialUpsertion_new(const char *username, + rd_kafka_ScramMechanism_t mechanism, + int32_t iterations, + const unsigned char *password, + size_t password_size, + const unsigned char *salt, + size_t salt_size); + +/** + * @brief Allocates a new UserScramCredentialDeletion given its fields. + * + * @param username The username (not empty). + * @param mechanism SASL/SCRAM mechanism. + * @return A newly created instance of rd_kafka_UserScramCredentialAlteration_t. + * Ownership belongs to the caller, use + * rd_kafka_UserScramCredentialAlteration_destroy to destroy. + */ +RD_EXPORT +rd_kafka_UserScramCredentialAlteration_t * +rd_kafka_UserScramCredentialDeletion_new(const char *username, + rd_kafka_ScramMechanism_t mechanism); + + +/** + * @brief Destroys a UserScramCredentialAlteration given its pointer + */ +RD_EXPORT +void rd_kafka_UserScramCredentialAlteration_destroy( + rd_kafka_UserScramCredentialAlteration_t *alteration); + +/** + * @brief Destroys an array of UserScramCredentialAlteration + */ +RD_EXPORT +void rd_kafka_UserScramCredentialAlteration_destroy_array( + rd_kafka_UserScramCredentialAlteration_t **alterations, + size_t alteration_cnt); + +/** + * @brief Result of a single user SCRAM alteration. + */ +typedef struct rd_kafka_AlterUserScramCredentials_result_response_s + rd_kafka_AlterUserScramCredentials_result_response_t; + +/** + * @brief Returns the username for a + * rd_kafka_AlterUserScramCredentials_result_response. + */ +RD_EXPORT +const char *rd_kafka_AlterUserScramCredentials_result_response_user( + const rd_kafka_AlterUserScramCredentials_result_response_t *response); + +/** + * @brief Returns the error of a + * rd_kafka_AlterUserScramCredentials_result_response. + */ +RD_EXPORT +const rd_kafka_error_t * +rd_kafka_AlterUserScramCredentials_result_response_error( + const rd_kafka_AlterUserScramCredentials_result_response_t *response); + +/** + * @brief Get an array of responses from a AlterUserScramCredentials result. + * + * The returned value life-time is the same as the \p result object. + * + * @param result Result to get responses from. + * @param cntp is updated to the number of elements in the array. + */ +RD_EXPORT +const rd_kafka_AlterUserScramCredentials_result_response_t ** +rd_kafka_AlterUserScramCredentials_result_responses( + const rd_kafka_AlterUserScramCredentials_result_t *result, + size_t *cntp); + +/** + * @brief Alter SASL/SCRAM credentials. + * This operation is supported by brokers with version 2.7.0 or higher. + * + * @remark For upsertions to be processed, librdkfka must be build with + * OpenSSL support. It's needed to calculate the HMAC. + * + * @param rk Client instance. + * @param alterations The alterations to be applied. + * @param alteration_cnt Number of elements in \p alterations array. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + */ +RD_EXPORT +void rd_kafka_AlterUserScramCredentials( + rd_kafka_t *rk, + rd_kafka_UserScramCredentialAlteration_t **alterations, + size_t alteration_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + +/**@}*/ + /** * @name Admin API - ACL operations * @{ diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 6c4419b3a2..dfa38e55d0 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -3883,7 +3883,6 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req, return reply->rkbuf_err; } - /** * @brief Call when leaders have been queried to progress the DeleteRecords * admin op to its next phase, sending DeleteRecords to partition @@ -5089,6 +5088,796 @@ void rd_kafka_DescribeAcls(rd_kafka_t *rk, rd_kafka_q_enq(rk->rk_ops, rko); } +struct rd_kafka_ScramCredentialInfo_s { + rd_kafka_ScramMechanism_t mechanism; + int32_t iterations; +}; + +rd_kafka_ScramMechanism_t rd_kafka_ScramCredentialInfo_mechanism( + const rd_kafka_ScramCredentialInfo_t *scram_credential_info) { + return scram_credential_info->mechanism; +} + +int32_t rd_kafka_ScramCredentialInfo_iterations( + const rd_kafka_ScramCredentialInfo_t *scram_credential_info) { + return scram_credential_info->iterations; +} + +struct rd_kafka_UserScramCredentialsDescription_s { + char *user; + rd_kafka_error_t *error; + size_t credential_info_cnt; + rd_kafka_ScramCredentialInfo_t *credential_infos; +}; + +rd_kafka_UserScramCredentialsDescription_t * +rd_kafka_UserScramCredentialsDescription_new(const char *username, + size_t num_credentials) { + rd_kafka_UserScramCredentialsDescription_t *description; + description = rd_calloc(1, sizeof(*description)); + description->user = rd_strdup(username); + description->error = NULL; + description->credential_info_cnt = num_credentials; + description->credential_infos = NULL; + if (num_credentials > 0) { + rd_kafka_ScramCredentialInfo_t *credentialinfo; + description->credential_infos = + rd_calloc(num_credentials, sizeof(*credentialinfo)); + } + return description; +} + +void rd_kafka_UserScramCredentialsDescription_destroy( + rd_kafka_UserScramCredentialsDescription_t *description) { + if (!description) + return; + rd_free(description->user); + rd_kafka_error_destroy(description->error); + if (description->credential_infos) + rd_free(description->credential_infos); + rd_free(description); +} + +void rd_kafka_UserScramCredentialsDescription_destroy_free(void *description) { + rd_kafka_UserScramCredentialsDescription_destroy(description); +} + +void rd_kafka_UserScramCredentailsDescription_set_error( + rd_kafka_UserScramCredentialsDescription_t *description, + rd_kafka_resp_err_t errorcode, + const char *err) { + rd_kafka_error_destroy(description->error); + description->error = rd_kafka_error_new(errorcode, "%s", err); +} + +const char *rd_kafka_UserScramCredentialsDescription_user( + const rd_kafka_UserScramCredentialsDescription_t *description) { + return description->user; +} + +const rd_kafka_error_t *rd_kafka_UserScramCredentialsDescription_error( + const rd_kafka_UserScramCredentialsDescription_t *description) { + return description->error; +} + +size_t rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count( + const rd_kafka_UserScramCredentialsDescription_t *description) { + return description->credential_info_cnt; +} + +const rd_kafka_ScramCredentialInfo_t * +rd_kafka_UserScramCredentialsDescription_scramcredentialinfo( + const rd_kafka_UserScramCredentialsDescription_t *description, + size_t idx) { + return &description->credential_infos[idx]; +} + +const rd_kafka_UserScramCredentialsDescription_t ** +rd_kafka_DescribeUserScramCredentials_result_descriptions( + const rd_kafka_DescribeUserScramCredentials_result_t *result, + size_t *cntp) { + *cntp = rd_list_cnt(&result->rko_u.admin_result.results); + return (const rd_kafka_UserScramCredentialsDescription_t **) + result->rko_u.admin_result.results.rl_elems; +} + +rd_kafka_resp_err_t +rd_kafka_DescribeUserScramCredentialsRequest(rd_kafka_broker_t *rkb, + const rd_list_t *userlist, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + size_t i; + size_t num_users; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DescribeUserScramCredentials, 0, 0, &features); + if (ApiVersion == -1) { + rd_snprintf( + errstr, errstr_size, + "DescribeUserScramCredentials API (KIP-554) not supported " + "by broker"); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + num_users = rd_list_cnt(userlist); + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_DescribeUserScramCredentials, 1, num_users * 25, + rd_true); + /* #Users */ + rd_kafka_buf_write_arraycnt(rkbuf, num_users); + for (i = 0; i < num_users; i++) { + rd_kafkap_str_t *user = rd_list_elem(userlist, i); + /* Name */ + rd_kafka_buf_write_str(rkbuf, user->str, user->len); + rd_kafka_buf_write_tags(rkbuf); + } + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + /* Last Tag buffer included automatically*/ + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +static rd_kafka_resp_err_t +rd_kafka_DescribeUserScramCredentialsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_op_t *rko_result = NULL; + int32_t num_users; + int16_t ErrorCode; + rd_kafkap_str_t ErrorMessage = RD_KAFKAP_STR_INITIALIZER; + int32_t i; + + rko_result = rd_kafka_admin_result_new(rko_req); + + /* ThrottleTimeMs */ + rd_kafka_buf_read_throttle_time(reply); + + /* ErrorCode */ + rd_kafka_buf_read_i16(reply, &ErrorCode); + rko_result->rko_err = ErrorCode; /*Request Level Error Code */ + + /* ErrorMessage */ + rd_kafka_buf_read_str(reply, &ErrorMessage); + if (ErrorCode) { + if (RD_KAFKAP_STR_LEN(&ErrorMessage) == 0) + errstr = (char *)rd_kafka_err2str(ErrorCode); + else + RD_KAFKAP_STR_DUPA(&errstr, &ErrorMessage); + rko_result->rko_u.admin_result.errstr = + errstr; /* Request Level Error string*/ + } + + /* #Results */ + rd_kafka_buf_read_arraycnt(reply, &num_users, 10000); + rd_list_init(&rko_result->rko_u.admin_result.results, num_users, + rd_kafka_UserScramCredentialsDescription_destroy_free); + + for (i = 0; i < num_users; i++) { + rd_kafkap_str_t User; + int16_t ErrorCode; + rd_kafkap_str_t ErrorMessage = RD_KAFKAP_STR_INITIALIZER; + size_t itr; + /* User */ + rd_kafka_buf_read_str(reply, &User); + /* ErrorCode */ + rd_kafka_buf_read_i16(reply, &ErrorCode); + /* ErrorMessage */ + rd_kafka_buf_read_str(reply, &ErrorMessage); + + int32_t num_credentials; + /* #CredentialInfos */ + rd_kafka_buf_read_arraycnt(reply, &num_credentials, 10000); + rd_kafka_UserScramCredentialsDescription_t *description = + rd_kafka_UserScramCredentialsDescription_new( + User.str, num_credentials); + rd_kafka_UserScramCredentailsDescription_set_error( + description, ErrorCode, ErrorMessage.str); + for (itr = 0; itr < (size_t)num_credentials; itr++) { + int8_t Mechanism; + int32_t Iterations; + /* Mechanism */ + rd_kafka_buf_read_i8(reply, &Mechanism); + /* Iterations */ + rd_kafka_buf_read_i32(reply, &Iterations); + rd_kafka_buf_skip_tags(reply); + rd_kafka_ScramCredentialInfo_t *scram_credential = + &description->credential_infos[itr]; + scram_credential->mechanism = Mechanism; + scram_credential->iterations = Iterations; + } + rd_kafka_buf_skip_tags(reply); + rd_list_add(&rko_result->rko_u.admin_result.results, + description); + } + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf( + errstr, errstr_size, + "DescribeUserScramCredentials response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +void rd_kafka_DescribeUserScramCredentials( + rd_kafka_t *rk, + const char **users, + size_t user_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + + rd_kafka_op_t *rko; + size_t i; + rd_list_t *userlist = NULL; + + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_DescribeUserScramCredentialsRequest, + rd_kafka_DescribeUserScramCredentialsResponse_parse, + }; + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS, + RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT, &cbs, options, + rkqu->rkqu_q); + + /* Check empty strings */ + for (i = 0; i < user_cnt; i++) { + if (!*users[i]) { + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Empty users aren't allowed, " + "index %" PRIusz, + i); + goto err; + } + } + + /* Check Duplicates */ + if (user_cnt > 1) { + userlist = rd_list_new(user_cnt, rd_free); + for (i = 0; i < user_cnt; i++) { + rd_list_add(userlist, rd_strdup(users[i])); + } + rd_list_sort(userlist, rd_strcmp2); + if (rd_list_find_duplicate(userlist, rd_strcmp2)) { + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate users aren't allowed " + "in the same request"); + goto err; + } + rd_list_destroy(userlist); + } + + rd_list_init(&rko->rko_u.admin_request.args, user_cnt, rd_free); + for (i = 0; i < user_cnt; i++) { + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafkap_str_new(users[i], -1)); + } + rd_kafka_q_enq(rk->rk_ops, rko); + return; +err: + RD_IF_FREE(userlist, rd_list_destroy); + rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); +} + +/** + * @enum rd_kafka_UserScramCredentialAlteration_type_t + * @brief Types of user SCRAM alterations. + */ +typedef enum rd_kafka_UserScramCredentialAlteration_type_s { + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_UPSERT = 0, + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_DELETE = 1, + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE__CNT +} rd_kafka_UserScramCredentialAlteration_type_t; + +struct rd_kafka_UserScramCredentialAlteration_s { + char *user; + rd_kafka_UserScramCredentialAlteration_type_t alteration_type; + union { + struct { + rd_kafka_ScramCredentialInfo_t credential_info; + rd_kafkap_bytes_t *salt; + rd_kafkap_bytes_t *password; + } upsertion; + struct { + rd_kafka_ScramMechanism_t mechanism; + } deletion; + } alteration; +}; + +rd_kafka_UserScramCredentialAlteration_t * +rd_kafka_UserScramCredentialUpsertion_new(const char *username, + rd_kafka_ScramMechanism_t mechanism, + int32_t iterations, + const unsigned char *password, + size_t password_size, + const unsigned char *salt, + size_t salt_size) { + rd_kafka_UserScramCredentialAlteration_t *alteration; + alteration = rd_calloc(1, sizeof(*alteration)); + alteration->user = rd_strdup(username); + alteration->alteration_type = + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_UPSERT; + alteration->alteration.upsertion.credential_info.mechanism = mechanism; + alteration->alteration.upsertion.credential_info.iterations = + iterations; + + alteration->alteration.upsertion.password = + rd_kafkap_bytes_new(password, password_size); + if (salt_size != 0) { + alteration->alteration.upsertion.salt = + rd_kafkap_bytes_new(salt, salt_size); + } else { +#if WITH_SSL + unsigned char random_salt[64]; + if (RAND_priv_bytes(random_salt, sizeof(random_salt)) == 1) { + alteration->alteration.upsertion.salt = + rd_kafkap_bytes_new(random_salt, + sizeof(random_salt)); + } +#endif + } + return alteration; +} + +rd_kafka_UserScramCredentialAlteration_t * +rd_kafka_UserScramCredentialDeletion_new(const char *username, + rd_kafka_ScramMechanism_t mechanism) { + rd_kafka_UserScramCredentialAlteration_t *alteration; + alteration = rd_calloc(1, sizeof(*alteration)); + alteration->user = rd_strdup(username); + alteration->alteration_type = + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_DELETE; + alteration->alteration.deletion.mechanism = mechanism; + return alteration; +} + +void rd_kafka_UserScramCredentialAlteration_destroy( + rd_kafka_UserScramCredentialAlteration_t *alteration) { + if (!alteration) + return; + rd_free(alteration->user); + if (alteration->alteration_type == + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_UPSERT) { + rd_kafkap_bytes_destroy(alteration->alteration.upsertion.salt); + rd_kafkap_bytes_destroy( + alteration->alteration.upsertion.password); + } + rd_free(alteration); +} + +void rd_kafka_UserScramCredentialAlteration_destroy_free(void *alteration) { + rd_kafka_UserScramCredentialAlteration_destroy(alteration); +} + +void rd_kafka_UserScramCredentialAlteration_destroy_array( + rd_kafka_UserScramCredentialAlteration_t **alterations, + size_t alteration_cnt) { + size_t i; + for (i = 0; i < alteration_cnt; i++) + rd_kafka_UserScramCredentialAlteration_destroy(alterations[i]); +} + +static rd_kafka_UserScramCredentialAlteration_t * +rd_kafka_UserScramCredentialAlteration_copy( + const rd_kafka_UserScramCredentialAlteration_t *alteration) { + rd_kafka_UserScramCredentialAlteration_t *copied_alteration = + rd_calloc(1, sizeof(*alteration)); + copied_alteration->user = rd_strdup(alteration->user); + copied_alteration->alteration_type = alteration->alteration_type; + + if (alteration->alteration_type == + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_UPSERT /*Upsert*/) { + copied_alteration->alteration.upsertion.salt = + rd_kafkap_bytes_copy(alteration->alteration.upsertion.salt); + copied_alteration->alteration.upsertion.password = + rd_kafkap_bytes_copy( + alteration->alteration.upsertion.password); + copied_alteration->alteration.upsertion.credential_info + .mechanism = + alteration->alteration.upsertion.credential_info.mechanism; + copied_alteration->alteration.upsertion.credential_info + .iterations = + alteration->alteration.upsertion.credential_info.iterations; + } else if ( + alteration->alteration_type == + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_DELETE /*Delete*/) { + copied_alteration->alteration.deletion.mechanism = + alteration->alteration.deletion.mechanism; + } + + return copied_alteration; +} + +struct rd_kafka_AlterUserScramCredentials_result_response_s { + char *user; + rd_kafka_error_t *error; +}; + +rd_kafka_AlterUserScramCredentials_result_response_t * +rd_kafka_AlterUserScramCredentials_result_response_new(const char *username) { + rd_kafka_AlterUserScramCredentials_result_response_t *response; + response = rd_calloc(1, sizeof(*response)); + response->user = rd_strdup(username); + response->error = NULL; + return response; +} + +void rd_kafka_AlterUserScramCredentials_result_response_destroy( + rd_kafka_AlterUserScramCredentials_result_response_t *response) { + if (response->user) + rd_free(response->user); + rd_kafka_error_destroy(response->error); + rd_free(response); +} + +void rd_kafka_AlterUserScramCredentials_result_response_destroy_free( + void *response) { + rd_kafka_AlterUserScramCredentials_result_response_destroy(response); +} + +void rd_kafka_AlterUserScramCredentials_result_response_set_error( + rd_kafka_AlterUserScramCredentials_result_response_t *response, + rd_kafka_resp_err_t errorcode, + const char *errstr) { + rd_kafka_error_destroy(response->error); + response->error = rd_kafka_error_new(errorcode, "%s", errstr); +} + +const char *rd_kafka_AlterUserScramCredentials_result_response_user( + const rd_kafka_AlterUserScramCredentials_result_response_t *response) { + return response->user; +} + +const rd_kafka_error_t * +rd_kafka_AlterUserScramCredentials_result_response_error( + const rd_kafka_AlterUserScramCredentials_result_response_t *response) { + return response->error; +} + +const rd_kafka_AlterUserScramCredentials_result_response_t ** +rd_kafka_AlterUserScramCredentials_result_responses( + const rd_kafka_AlterUserScramCredentials_result_t *result, + size_t *cntp) { + *cntp = rd_list_cnt(&result->rko_u.admin_result.results); + return (const rd_kafka_AlterUserScramCredentials_result_response_t **) + result->rko_u.admin_result.results.rl_elems; +} + + +#if WITH_SSL +static rd_kafkap_bytes_t * +rd_kafka_AlterUserScramCredentialsRequest_salted_password( + rd_kafka_broker_t *rkb, + rd_kafkap_bytes_t *salt, + rd_kafkap_bytes_t *password, + rd_kafka_ScramMechanism_t mechanism, + int32_t iterations) { + rd_chariov_t saltedpassword_chariov = {.ptr = + rd_alloca(EVP_MAX_MD_SIZE)}; + + rd_chariov_t salt_chariov; + salt_chariov.ptr = (char *)salt->data; + salt_chariov.size = RD_KAFKAP_BYTES_LEN(salt); + + rd_chariov_t password_chariov; + password_chariov.ptr = (char *)password->data; + password_chariov.size = RD_KAFKAP_BYTES_LEN(password); + + const EVP_MD *evp = NULL; + if (mechanism == RD_KAFKA_SCRAM_MECHANISM_SHA_256) + evp = EVP_sha256(); + else if (mechanism == RD_KAFKA_SCRAM_MECHANISM_SHA_512) + evp = EVP_sha512(); + rd_assert(evp != NULL); + + rd_kafka_ssl_hmac(rkb, evp, &password_chariov, &salt_chariov, + iterations, &saltedpassword_chariov); + + return rd_kafkap_bytes_new( + (const unsigned char *)saltedpassword_chariov.ptr, + saltedpassword_chariov.size); +} +#endif + +rd_kafka_resp_err_t rd_kafka_AlterUserScramCredentialsRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *user_scram_credential_alterations, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + size_t num_deletions = 0; + size_t i; + size_t num_alterations; + size_t of_deletions; + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DescribeUserScramCredentials, 0, 0, &features); + if (ApiVersion == -1) { + rd_snprintf( + errstr, errstr_size, + "AlterUserScramCredentials API (KIP-554) not supported " + "by broker"); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + num_alterations = rd_list_cnt(user_scram_credential_alterations); + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_AlterUserScramCredentials, 1, num_alterations * 100, + rd_true); + + /* Deletion scram requests*/ + + /* #Deletions */ + of_deletions = rd_kafka_buf_write_arraycnt_pos(rkbuf); + + for (i = 0; i < num_alterations; i++) { + rd_kafka_UserScramCredentialAlteration_t *alteration = + rd_list_elem(user_scram_credential_alterations, i); + if (alteration->alteration_type != + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_DELETE) + continue; + + num_deletions++; + /* Name */ + rd_kafka_buf_write_str(rkbuf, alteration->user, + strlen(alteration->user)); + /* Mechanism */ + rd_kafka_buf_write_i8( + rkbuf, alteration->alteration.deletion.mechanism); + rd_kafka_buf_write_tags(rkbuf); + } + rd_kafka_buf_finalize_arraycnt(rkbuf, of_deletions, num_deletions); + + /* Upsertion scram request*/ + + /* #Upsertions */ + rd_kafka_buf_write_arraycnt(rkbuf, num_alterations - num_deletions); + for (i = 0; i < num_alterations; i++) { + rd_kafka_UserScramCredentialAlteration_t *alteration = + rd_list_elem(user_scram_credential_alterations, i); + if (alteration->alteration_type != + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_UPSERT) + continue; + +#if !WITH_SSL + rd_assert(!*"OpenSSL is required for upsertions"); +#else + char *user = alteration->user; + size_t usersize = strlen(user); + rd_kafka_ScramMechanism_t mechanism = + alteration->alteration.upsertion.credential_info.mechanism; + int32_t iterations = + alteration->alteration.upsertion.credential_info.iterations; + /* Name */ + rd_kafka_buf_write_str(rkbuf, user, usersize); + + /* Mechanism */ + rd_kafka_buf_write_i8(rkbuf, mechanism); + + /* Iterations */ + rd_kafka_buf_write_i32(rkbuf, iterations); + + /* Salt */ + rd_kafka_buf_write_kbytes( + rkbuf, alteration->alteration.upsertion.salt); + + rd_kafkap_bytes_t *password_bytes = + rd_kafka_AlterUserScramCredentialsRequest_salted_password( + rkb, alteration->alteration.upsertion.salt, + alteration->alteration.upsertion.password, mechanism, + iterations); + + /* SaltedPassword */ + rd_kafka_buf_write_kbytes(rkbuf, password_bytes); + rd_kafkap_bytes_destroy(password_bytes); + rd_kafka_buf_write_tags(rkbuf); +#endif + } + + rd_kafka_buf_write_tags(rkbuf); + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +rd_kafka_resp_err_t +rd_kafka_AlterUserScramCredentialsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_op_t *rko_result = NULL; + int32_t num_results; + int32_t i; + + rko_result = rd_kafka_admin_result_new(rko_req); + + /* ThrottleTimeMs */ + rd_kafka_buf_read_throttle_time(reply); + + /* #Results */ + rd_kafka_buf_read_arraycnt(reply, &num_results, 10000); + + rd_list_init( + &rko_result->rko_u.admin_result.results, num_results, + rd_kafka_AlterUserScramCredentials_result_response_destroy_free); + for (i = 0; i < num_results; i++) { + rd_kafkap_str_t User; + int16_t ErrorCode; + rd_kafkap_str_t ErrorMessage = RD_KAFKAP_STR_INITIALIZER; + + /* User */ + rd_kafka_buf_read_str(reply, &User); + + /* ErrorCode */ + rd_kafka_buf_read_i16(reply, &ErrorCode); + + /* ErrorMessage */ + rd_kafka_buf_read_str(reply, &ErrorMessage); + + rd_kafka_buf_skip_tags(reply); + + rd_kafka_AlterUserScramCredentials_result_response_t *response = + rd_kafka_AlterUserScramCredentials_result_response_new( + User.str); + rd_kafka_AlterUserScramCredentials_result_response_set_error( + response, ErrorCode, ErrorMessage.str); + rd_list_add(&rko_result->rko_u.admin_result.results, response); + } + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf( + errstr, errstr_size, + "AlterUserScramCredentials response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +void rd_kafka_AlterUserScramCredentials( + rd_kafka_t *rk, + rd_kafka_UserScramCredentialAlteration_t **alterations, + size_t alteration_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + + rd_kafka_op_t *rko; + size_t i; + + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_AlterUserScramCredentialsRequest, + rd_kafka_AlterUserScramCredentialsResponse_parse, + }; + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS, + RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT, &cbs, options, + rkqu->rkqu_q); + + if (alteration_cnt > 0) { + const char *errstr = NULL; + for (i = 0; i < alteration_cnt; i++) { + rd_bool_t is_upsert = + alterations[i]->alteration_type == + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_UPSERT; + rd_bool_t is_delete = + alterations[i]->alteration_type == + RD_KAFKA_USER_SCRAM_CREDENTIAL_ALTERATION_TYPE_DELETE; + + if ((is_upsert || is_delete) && + alterations[i] + ->alteration.upsertion.credential_info + .mechanism == + RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) { + errstr = + "SCRAM mechanism must be specified at " + "index %" PRIusz; + break; + } + + + if (!alterations[i]->user || !*alterations[i]->user) { + errstr = "Empty user at index %" PRIusz; + break; + } + + if (is_upsert) { +#if !WITH_SSL + errstr = + "OpenSSL required for upsertion at index " + "%" PRIusz; + break; +#endif + if (RD_KAFKAP_BYTES_LEN( + alterations[i] + ->alteration.upsertion.password) == + 0) { + errstr = + "Empty password at index %" PRIusz; + break; + } + + if (!alterations[i] + ->alteration.upsertion.salt || + RD_KAFKAP_BYTES_LEN( + alterations[i] + ->alteration.upsertion.salt) == 0) { + errstr = "Empty salt at index %" PRIusz; + break; + } + + if (alterations[i] + ->alteration.upsertion.credential_info + .iterations <= 0) { + errstr = + "Non-positive iterations at index " + "%" PRIusz; + break; + } + } + } + + if (errstr) { + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, errstr, i); + rd_kafka_admin_common_worker_destroy( + rk, rko, rd_true /*destroy*/); + return; + } + } else { + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "At least one alteration is required"); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + return; + } + + rd_list_init(&rko->rko_u.admin_request.args, alteration_cnt, + rd_kafka_UserScramCredentialAlteration_destroy_free); + + for (i = 0; i < alteration_cnt; i++) { + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_UserScramCredentialAlteration_copy( + alterations[i])); + } + rd_kafka_q_enq(rk->rk_ops, rko); + return; +} + /** * @brief Get an array of rd_kafka_AclBinding_t from a DescribeAcls result. * @@ -5586,7 +6375,6 @@ void rd_kafka_AlterConsumerGroupOffsets( rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); } - /** * @brief Get an array of group results from a AlterGroups result. * @@ -6713,8 +7501,8 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, } rd_kafka_buf_read_str(reply, &ClientId); rd_kafka_buf_read_str(reply, &ClientHost); - rd_kafka_buf_read_bytes(reply, &MemberMetadata); - rd_kafka_buf_read_bytes(reply, &MemberAssignment); + rd_kafka_buf_read_kbytes(reply, &MemberMetadata); + rd_kafka_buf_read_kbytes(reply, &MemberAssignment); if (error != NULL) continue; diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 380f49dd0c..05fbf8db97 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -34,8 +34,15 @@ #include "rdmap.h" #include "rdkafka_error.h" #include "rdkafka_confval.h" - - +#if WITH_SSL +typedef struct rd_kafka_broker_s rd_kafka_broker_t; +extern int rd_kafka_ssl_hmac(rd_kafka_broker_t *rkb, + const EVP_MD *evp, + const rd_chariov_t *in, + const rd_chariov_t *salt, + int itcnt, + rd_chariov_t *out); +#endif /** * @brief Common AdminOptions type used for all admin APIs. diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index cedcf22919..7845beff90 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -711,12 +711,21 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen)); \ } while (0) -/* Read Kafka Bytes representation (4+N). - * The 'kbytes' will be updated to point to rkbuf data */ -#define rd_kafka_buf_read_bytes(rkbuf, kbytes) \ +/** + * Read Kafka COMPACT_BYTES representation (VARINT+N) or + * standard BYTES representation(4+N). + * The 'kbytes' will be updated to point to rkbuf data. + */ +#define rd_kafka_buf_read_kbytes(rkbuf, kbytes) \ do { \ - int _klen; \ - rd_kafka_buf_read_i32a(rkbuf, _klen); \ + int32_t _klen; \ + if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) { \ + rd_kafka_buf_read_i32a(rkbuf, _klen); \ + } else { \ + uint64_t _uva; \ + rd_kafka_buf_read_uvarint(rkbuf, &_uva); \ + _klen = ((int32_t)_uva) - 1; \ + } \ (kbytes)->len = _klen; \ if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \ (kbytes)->data = NULL; \ @@ -728,7 +737,6 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ rd_kafka_buf_check_len(rkbuf, _klen); \ } while (0) - /** * @brief Read \p size bytes from buffer, setting \p *ptr to the start * of the memory region. @@ -745,7 +753,7 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ /** * @brief Read varint-lengted Kafka Bytes representation */ -#define rd_kafka_buf_read_bytes_varint(rkbuf, kbytes) \ +#define rd_kafka_buf_read_kbytes_varint(rkbuf, kbytes) \ do { \ int64_t _len2; \ size_t _r = \ @@ -1304,30 +1312,40 @@ static RD_INLINE void rd_kafka_buf_push_kstr(rd_kafka_buf_t *rkbuf, static RD_INLINE size_t rd_kafka_buf_write_kbytes(rd_kafka_buf_t *rkbuf, const rd_kafkap_bytes_t *kbytes) { - size_t len; + size_t len, r; - if (!kbytes || RD_KAFKAP_BYTES_IS_NULL(kbytes)) - return rd_kafka_buf_write_i32(rkbuf, -1); + if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) { + if (!kbytes || RD_KAFKAP_BYTES_IS_NULL(kbytes)) + return rd_kafka_buf_write_i32(rkbuf, -1); - if (RD_KAFKAP_BYTES_IS_SERIALIZED(kbytes)) - return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes), - RD_KAFKAP_BYTES_SIZE(kbytes)); + if (RD_KAFKAP_BYTES_IS_SERIALIZED(kbytes)) + return rd_kafka_buf_write(rkbuf, + RD_KAFKAP_BYTES_SER(kbytes), + RD_KAFKAP_BYTES_SIZE(kbytes)); - len = RD_KAFKAP_BYTES_LEN(kbytes); - rd_kafka_buf_write_i32(rkbuf, (int32_t)len); - rd_kafka_buf_write(rkbuf, kbytes->data, len); + len = RD_KAFKAP_BYTES_LEN(kbytes); + rd_kafka_buf_write_i32(rkbuf, (int32_t)len); + rd_kafka_buf_write(rkbuf, kbytes->data, len); - return 4 + len; -} + return 4 + len; + } -/** - * Push (i.e., no copy) Kafka bytes to buffer iovec - */ -static RD_INLINE void -rd_kafka_buf_push_kbytes(rd_kafka_buf_t *rkbuf, - const rd_kafkap_bytes_t *kbytes) { - rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes), - RD_KAFKAP_BYTES_SIZE(kbytes), NULL); + /* COMPACT_BYTES lengths are: + * 0 = NULL, + * 1 = empty + * N.. = length + 1 + */ + if (!kbytes) + len = 0; + else + len = kbytes->len + 1; + + r = rd_kafka_buf_write_uvarint(rkbuf, (uint64_t)len); + if (len > 1) { + rd_kafka_buf_write(rkbuf, kbytes->data, len - 1); + r += len - 1; + } + return r; } /** @@ -1410,5 +1428,4 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf, rd_kafka_make_req_cb_t *make_cb, void *make_opaque, void (*free_make_opaque_cb)(void *make_opaque)); - #endif /* _RDKAFKA_BUF_H_ */ diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 8d150fc59b..9926f8632c 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1512,7 +1512,7 @@ static void rd_kafka_cgrp_handle_SyncGroup_memberstate( if (!(assignment = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields))) goto err_parse; - rd_kafka_buf_read_bytes(rkbuf, &UserData); + rd_kafka_buf_read_kbytes(rkbuf, &UserData); done: rd_kafka_cgrp_update_session_timeout(rkcg, rd_true /*reset timeout*/); @@ -1617,7 +1617,7 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk, rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - rd_kafka_buf_read_bytes(rkbuf, &MemberState); + rd_kafka_buf_read_kbytes(rkbuf, &MemberState); err: actions = rd_kafka_err_action(rkb, ErrorCode, request, @@ -1803,7 +1803,7 @@ static int rd_kafka_group_MemberMetadata_consumer_read( rkgm->rkgm_subscription, topic_name, RD_KAFKA_PARTITION_UA); } - rd_kafka_buf_read_bytes(rkbuf, &UserData); + rd_kafka_buf_read_kbytes(rkbuf, &UserData); rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData); const rd_kafka_topic_partition_field_t fields[] = { @@ -1990,7 +1990,7 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, rd_kafka_buf_read_str(rkbuf, &MemberId); if (request->rkbuf_reqhdr.ApiVersion >= 5) rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); - rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata); + rd_kafka_buf_read_kbytes(rkbuf, &MemberMetadata); rkgm = &members[sub_cnt]; rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId); diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index b2a6843ca2..28e602b23b 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -86,6 +86,10 @@ const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) { return "ListConsumerGroupOffsetsResult"; case RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH: return "SaslOAuthBearerTokenRefresh"; + case RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT: + return "DescribeUserScramCredentials"; + case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: + return "AlterUserScramCredentials"; default: return "?unknown?"; } @@ -427,6 +431,25 @@ rd_kafka_event_AlterConsumerGroupOffsets_result(rd_kafka_event_t *rkev) { const rd_kafka_AlterConsumerGroupOffsets_result_t *)rkev; } +const rd_kafka_DescribeUserScramCredentials_result_t * +rd_kafka_event_DescribeUserScramCredentials_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != + RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT) + return NULL; + else + return ( + const rd_kafka_DescribeUserScramCredentials_result_t *)rkev; +} + +const rd_kafka_AlterUserScramCredentials_result_t * +rd_kafka_event_AlterUserScramCredentials_result(rd_kafka_event_t *rkev) { + if (!rkev || + rkev->rko_evtype != RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT) + return NULL; + else + return ( + const rd_kafka_AlterUserScramCredentials_result_t *)rkev; +} const rd_kafka_ListConsumerGroupOffsets_result_t * rd_kafka_event_ListConsumerGroupOffsets_result(rd_kafka_event_t *rkev) { if (!rkev || diff --git a/src/rdkafka_event.h b/src/rdkafka_event.h index 52c2d191a2..aa8e4c6270 100644 --- a/src/rdkafka_event.h +++ b/src/rdkafka_event.h @@ -111,6 +111,8 @@ static RD_UNUSED RD_INLINE int rd_kafka_event_setup(rd_kafka_t *rk, case RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT: case RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT: case RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH: + case RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT: + case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: return 1; default: diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index efebd33da5..f3d9f1134a 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -97,7 +97,7 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, mpart = rd_kafka_mock_partition_find(mtopic, Partition); - rd_kafka_buf_read_bytes(rkbuf, &records); + rd_kafka_buf_read_kbytes(rkbuf, &records); /* Response: Partition */ rd_kafka_buf_write_i32(resp, Partition); @@ -353,9 +353,10 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, if (mset && partsize < (size_t)PartMaxBytes && totsize < (size_t)MaxBytes) { /* Response: Records */ - rd_kafka_buf_write_kbytes(resp, &mset->bytes); - partsize += RD_KAFKAP_BYTES_SIZE(&mset->bytes); - totsize += RD_KAFKAP_BYTES_SIZE(&mset->bytes); + size_t written = rd_kafka_buf_write_kbytes( + resp, &mset->bytes); + partsize += written; + totsize += written; /* FIXME: Multiple messageSets ? */ } else { @@ -1166,7 +1167,7 @@ static int rd_kafka_mock_handle_JoinGroup(rd_kafka_mock_connection_t *mconn, rd_kafkap_str_t ProtocolName; rd_kafkap_bytes_t Metadata; rd_kafka_buf_read_str(rkbuf, &ProtocolName); - rd_kafka_buf_read_bytes(rkbuf, &Metadata); + rd_kafka_buf_read_kbytes(rkbuf, &Metadata); protos[i].name = rd_kafkap_str_copy(&ProtocolName); protos[i].metadata = rd_kafkap_bytes_copy(&Metadata); } @@ -1454,7 +1455,7 @@ static int rd_kafka_mock_handle_SyncGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_cgrp_member_t *member2; rd_kafka_buf_read_str(rkbuf, &MemberId2); - rd_kafka_buf_read_bytes(rkbuf, &Metadata); + rd_kafka_buf_read_kbytes(rkbuf, &Metadata); if (err) continue; diff --git a/src/rdkafka_msgset_reader.c b/src/rdkafka_msgset_reader.c index 8b23d23ca7..c1b08fbbcd 100644 --- a/src/rdkafka_msgset_reader.c +++ b/src/rdkafka_msgset_reader.c @@ -631,10 +631,10 @@ rd_kafka_msgset_reader_msg_v0_1(rd_kafka_msgset_reader_t *msetr) { /* Extract key */ - rd_kafka_buf_read_bytes(rkbuf, &Key); + rd_kafka_buf_read_kbytes(rkbuf, &Key); /* Extract Value */ - rd_kafka_buf_read_bytes(rkbuf, &Value); + rd_kafka_buf_read_kbytes(rkbuf, &Value); Value_len = RD_KAFKAP_BYTES_LEN(&Value); /* MessageSets may contain offsets earlier than we @@ -894,8 +894,8 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) { /* Note: messages in aborted transactions are skipped at the MessageSet * level */ - rd_kafka_buf_read_bytes_varint(rkbuf, &hdr.Key); - rd_kafka_buf_read_bytes_varint(rkbuf, &hdr.Value); + rd_kafka_buf_read_kbytes_varint(rkbuf, &hdr.Key); + rd_kafka_buf_read_kbytes_varint(rkbuf, &hdr.Value); /* We parse the Headers later, just store the size (possibly truncated) * and pointer to the headers. */ diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 32cf4b3623..6ecb6cd14c 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -110,6 +110,10 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { [RD_KAFKA_OP_LEADERS] = "REPLY:LEADERS", [RD_KAFKA_OP_BARRIER] = "REPLY:BARRIER", [RD_KAFKA_OP_SASL_REAUTH] = "REPLY:SASL_REAUTH", + [RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS] = + "REPLY:ALTERUSERSCRAMCREDENTIALS", + [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = + "REPLY:DESCRIBEUSERSCRAMCREDENTIALS", }; if (type & RD_KAFKA_OP_REPLY) @@ -262,6 +266,10 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { [RD_KAFKA_OP_LEADERS] = sizeof(rko->rko_u.leaders), [RD_KAFKA_OP_BARRIER] = _RD_KAFKA_OP_EMPTY, [RD_KAFKA_OP_SASL_REAUTH] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS] = + sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = + sizeof(rko->rko_u.admin_request), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -408,6 +416,8 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { case RD_KAFKA_OP_DELETEACLS: case RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS: case RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS: + case RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS: + case RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS: rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq); rd_list_destroy(&rko->rko_u.admin_request.args); if (rko->rko_u.admin_request.options.match_consumer_group_states diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index f9ccec2373..6018a2659d 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -166,6 +166,12 @@ typedef enum { RD_KAFKA_OP_LEADERS, /**< Partition leader query */ RD_KAFKA_OP_BARRIER, /**< Version barrier bump */ RD_KAFKA_OP_SASL_REAUTH, /**< Sasl reauthentication for broker */ + RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS, /* < Admin: + DescribeUserScramCredentials + u.admin_request >*/ + RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS, /* < Admin: + AlterUserScramCredentials + u.admin_request >*/ RD_KAFKA_OP__END } rd_kafka_op_type_t; diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index ab40168ac3..1a9066d3d9 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -2487,7 +2487,6 @@ void rd_kafka_topic_partition_get(const rd_kafka_topic_partition_t *rktpar, } - /** * * rd_kafka_topic_partition_t lists @@ -2766,7 +2765,6 @@ void rd_kafka_topic_partition_list_destroy_free(void *ptr) { (rd_kafka_topic_partition_list_t *)ptr); } - /** * @brief Add a partition to an rktpar list. * The list must have enough room to fit it. diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index cac898a55c..24fce04106 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -378,7 +378,7 @@ typedef struct rd_kafkap_bytes_s { int32_t len; /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */ const void *data; /* points just past the struct, or other memory, * not NULL-terminated */ - const char _data[1]; /* Bytes following struct when new()ed */ + const unsigned char _data[1]; /* Bytes following struct when new()ed */ } rd_kafkap_bytes_t; @@ -423,7 +423,7 @@ static RD_UNUSED void rd_kafkap_bytes_destroy(rd_kafkap_bytes_t *kbytes) { * - No-copy, just alloc (bytes==NULL,len>0) */ static RD_INLINE RD_UNUSED rd_kafkap_bytes_t * -rd_kafkap_bytes_new(const char *bytes, int32_t len) { +rd_kafkap_bytes_new(const unsigned char *bytes, int32_t len) { rd_kafkap_bytes_t *kbytes; int32_t klen; @@ -440,7 +440,7 @@ rd_kafkap_bytes_new(const char *bytes, int32_t len) { if (len == RD_KAFKAP_BYTES_LEN_NULL) kbytes->data = NULL; else { - kbytes->data = ((const char *)(kbytes + 1)) + 4; + kbytes->data = ((const unsigned char *)(kbytes + 1)) + 4; if (bytes) memcpy((void *)kbytes->data, bytes, len); } @@ -455,7 +455,7 @@ rd_kafkap_bytes_new(const char *bytes, int32_t len) { */ static RD_INLINE RD_UNUSED rd_kafkap_bytes_t * rd_kafkap_bytes_copy(const rd_kafkap_bytes_t *src) { - return rd_kafkap_bytes_new((const char *)src->data, src->len); + return rd_kafkap_bytes_new((const unsigned char *)src->data, src->len); } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index a2b6656de1..8d0789cfc7 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2610,7 +2610,7 @@ void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk, goto err; } - rd_kafka_buf_read_bytes(rkbuf, &auth_data); + rd_kafka_buf_read_kbytes(rkbuf, &auth_data); if (request->rkbuf_reqhdr.ApiVersion >= 1) { int64_t session_lifetime_ms; diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 6f08e7a8a6..097b2fcb36 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -430,7 +430,6 @@ rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb, int unittest_request(void); - rd_kafka_resp_err_t rd_kafka_DeleteRecordsRequest(rd_kafka_broker_t *rkb, /*(rd_topic_partition_list_t*)*/ diff --git a/src/rdkafka_sasl_scram.c b/src/rdkafka_sasl_scram.c index 1a4aebb835..32f13a4c04 100644 --- a/src/rdkafka_sasl_scram.c +++ b/src/rdkafka_sasl_scram.c @@ -256,8 +256,6 @@ static int rd_kafka_sasl_scram_HMAC(rd_kafka_transport_t *rktrans, return 0; } - - /** * @brief Perform \p itcnt iterations of HMAC() on the given buffer \p in * using \p salt, writing the output into \p out which must be @@ -269,57 +267,14 @@ static int rd_kafka_sasl_scram_Hi(rd_kafka_transport_t *rktrans, const rd_chariov_t *salt, int itcnt, rd_chariov_t *out) { + rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; const EVP_MD *evp = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp; - unsigned int ressize = 0; - unsigned char tempres[EVP_MAX_MD_SIZE]; - unsigned char *saltplus; - int i; - - /* U1 := HMAC(str, salt + INT(1)) */ - saltplus = rd_alloca(salt->size + 4); - memcpy(saltplus, salt->ptr, salt->size); - saltplus[salt->size] = 0; - saltplus[salt->size + 1] = 0; - saltplus[salt->size + 2] = 0; - saltplus[salt->size + 3] = 1; - - /* U1 := HMAC(str, salt + INT(1)) */ - if (!HMAC(evp, (const unsigned char *)in->ptr, (int)in->size, saltplus, - salt->size + 4, tempres, &ressize)) { - rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM", - "HMAC priming failed"); - return -1; - } - - memcpy(out->ptr, tempres, ressize); - - /* Ui-1 := HMAC(str, Ui-2) .. */ - for (i = 1; i < itcnt; i++) { - unsigned char tempdest[EVP_MAX_MD_SIZE]; - int j; - - if (unlikely(!HMAC(evp, (const unsigned char *)in->ptr, - (int)in->size, tempres, ressize, tempdest, - NULL))) { - rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM", - "Hi() HMAC #%d/%d failed", i, itcnt); - return -1; - } - - /* U1 XOR U2 .. */ - for (j = 0; j < (int)ressize; j++) { - out->ptr[j] ^= tempdest[j]; - tempres[j] = tempdest[j]; - } - } - - out->size = ressize; - - return 0; + return rd_kafka_ssl_hmac(rkb, evp, in, salt, itcnt, out); } + /** * @returns a SASL value-safe-char encoded string, replacing "," and "=" * with their escaped counterparts in a newly allocated string. diff --git a/src/rdkafka_ssl.c b/src/rdkafka_ssl.c index 19178c84b3..85f745cb9c 100644 --- a/src/rdkafka_ssl.c +++ b/src/rdkafka_ssl.c @@ -1848,3 +1848,56 @@ void rd_kafka_ssl_init(void) { OpenSSL_add_all_algorithms(); #endif } + +int rd_kafka_ssl_hmac(rd_kafka_broker_t *rkb, + const EVP_MD *evp, + const rd_chariov_t *in, + const rd_chariov_t *salt, + int itcnt, + rd_chariov_t *out) { + unsigned int ressize = 0; + unsigned char tempres[EVP_MAX_MD_SIZE]; + unsigned char *saltplus; + int i; + + /* U1 := HMAC(str, salt + INT(1)) */ + saltplus = rd_alloca(salt->size + 4); + memcpy(saltplus, salt->ptr, salt->size); + saltplus[salt->size] = 0; + saltplus[salt->size + 1] = 0; + saltplus[salt->size + 2] = 0; + saltplus[salt->size + 3] = 1; + + /* U1 := HMAC(str, salt + INT(1)) */ + if (!HMAC(evp, (const unsigned char *)in->ptr, (int)in->size, saltplus, + salt->size + 4, tempres, &ressize)) { + rd_rkb_dbg(rkb, SECURITY, "SSLHMAC", "HMAC priming failed"); + return -1; + } + + memcpy(out->ptr, tempres, ressize); + + /* Ui-1 := HMAC(str, Ui-2) .. */ + for (i = 1; i < itcnt; i++) { + unsigned char tempdest[EVP_MAX_MD_SIZE]; + int j; + + if (unlikely(!HMAC(evp, (const unsigned char *)in->ptr, + (int)in->size, tempres, ressize, tempdest, + NULL))) { + rd_rkb_dbg(rkb, SECURITY, "SSLHMAC", + "Hi() HMAC #%d/%d failed", i, itcnt); + return -1; + } + + /* U1 XOR U2 .. */ + for (j = 0; j < (int)ressize; j++) { + out->ptr[j] ^= tempdest[j]; + tempres[j] = tempdest[j]; + } + } + + out->size = ressize; + + return 0; +} diff --git a/src/rdkafka_ssl.h b/src/rdkafka_ssl.h index 9fb07e3312..4dce0b1f87 100644 --- a/src/rdkafka_ssl.h +++ b/src/rdkafka_ssl.h @@ -54,4 +54,11 @@ void rd_kafka_ssl_init(void); const char *rd_kafka_ssl_last_error_str(void); +int rd_kafka_ssl_hmac(rd_kafka_broker_t *rkb, + const EVP_MD *evp, + const rd_chariov_t *in, + const rd_chariov_t *salt, + int itcnt, + rd_chariov_t *out); + #endif /* _RDKAFKA_SSL_H_ */ diff --git a/tests/0080-admin_ut.c b/tests/0080-admin_ut.c index e187297b84..66693d3fdd 100644 --- a/tests/0080-admin_ut.c +++ b/tests/0080-admin_ut.c @@ -1988,6 +1988,140 @@ static void do_test_ListConsumerGroupOffsets(const char *what, SUB_TEST_PASS(); } +static void do_test_DescribeUserScramCredentials(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq) { + char errstr[512]; + rd_kafka_AdminOptions_t *options; + rd_kafka_event_t *rkev; + rd_kafka_queue_t *rkqu; + + SUB_TEST_QUICK("%s", what); + + rkqu = useq ? useq : rd_kafka_queue_new(rk); + + const char *users[2]; + users[0] = "Sam"; + users[1] = "Sam"; + + /* Whenever a duplicate user is passed, + * the request should fail with error code + * RD_KAFKA_RESP_ERR__INVALID_ARG */ + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS); + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + rd_kafka_DescribeUserScramCredentials(rk, users, RD_ARRAY_SIZE(users), + options, rkqu); + rd_kafka_AdminOptions_destroy(options); + + rkev = test_wait_admin_result( + rkqu, RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT, 2000); + + TEST_ASSERT( + rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__INVALID_ARG, + "Expected \"Local: Invalid argument or configuration\", not %s", + rd_kafka_err2str(rd_kafka_event_error(rkev))); + + rd_kafka_event_destroy(rkev); + + if (!useq) + rd_kafka_queue_destroy(rkqu); + + SUB_TEST_PASS(); +} + +static void do_test_AlterUserScramCredentials(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq) { + char errstr[512]; + rd_kafka_AdminOptions_t *options; + rd_kafka_event_t *rkev; + rd_kafka_queue_t *rkqu; + + SUB_TEST_QUICK("%s", what); + + rkqu = useq ? useq : rd_kafka_queue_new(rk); + +#if !WITH_SSL + /* Whenever librdkafka wasn't built with OpenSSL, + * the request should fail with error code + * RD_KAFKA_RESP_ERR__INVALID_ARG */ + rd_kafka_UserScramCredentialAlteration_t *alterations_ssl[1]; + alterations_ssl[0] = rd_kafka_UserScramCredentialUpsertion_new( + "user", RD_KAFKA_SCRAM_MECHANISM_SHA_256, 10000, + (unsigned char *)"password", 8, (unsigned char *)"salt", 4); + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS); + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + rd_kafka_AlterUserScramCredentials(rk, alterations_ssl, 1, options, + rkqu); + rd_kafka_UserScramCredentialAlteration_destroy_array( + alterations_ssl, RD_ARRAY_SIZE(alterations_ssl)); + rd_kafka_AdminOptions_destroy(options); + + rkev = test_wait_admin_result( + rkqu, RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT, 2000); + + TEST_ASSERT( + rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__INVALID_ARG, + "Expected \"Local: Invalid argument or configuration\", not %s", + rd_kafka_err2str(rd_kafka_event_error(rkev))); + + rd_kafka_event_destroy(rkev); +#endif + + rd_kafka_UserScramCredentialAlteration_t *alterations[1]; + alterations[0] = rd_kafka_UserScramCredentialDeletion_new( + "", RD_KAFKA_SCRAM_MECHANISM_SHA_256); + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS); + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + /* Whenever an empty array is passed, + * the request should fail with error code + * RD_KAFKA_RESP_ERR__INVALID_ARG */ + rd_kafka_AlterUserScramCredentials(rk, alterations, 0, options, rkqu); + + rkev = test_wait_admin_result( + rkqu, RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT, 2000); + + TEST_ASSERT( + rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__INVALID_ARG, + "Expected \"Local: Invalid argument or configuration\", not %s", + rd_kafka_err2str(rd_kafka_event_error(rkev))); + + rd_kafka_event_destroy(rkev); + + /* Whenever an empty user is passed, + * the request should fail with error code + * RD_KAFKA_RESP_ERR__INVALID_ARG */ + rd_kafka_AlterUserScramCredentials( + rk, alterations, RD_ARRAY_SIZE(alterations), options, rkqu); + rkev = test_wait_admin_result( + rkqu, RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT, 2000); + + TEST_ASSERT( + rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__INVALID_ARG, + "Expected \"Local: Invalid argument or configuration\", not %s", + rd_kafka_err2str(rd_kafka_event_error(rkev))); + + rd_kafka_event_destroy(rkev); + + + rd_kafka_UserScramCredentialAlteration_destroy_array( + alterations, RD_ARRAY_SIZE(alterations)); + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(rkqu); + + SUB_TEST_PASS(); +} /** * @brief Test a mix of APIs using the same replyq. @@ -2496,6 +2630,12 @@ static void do_test_apis(rd_kafka_type_t cltype) { do_test_ListConsumerGroupOffsets("main queue, options", rk, mainq, 1, rd_true); + do_test_DescribeUserScramCredentials("main queue", rk, mainq); + do_test_DescribeUserScramCredentials("temp queue", rk, NULL); + + do_test_AlterUserScramCredentials("main queue", rk, mainq); + do_test_AlterUserScramCredentials("temp queue", rk, NULL); + do_test_mix(rk, mainq); do_test_configs(rk, mainq); diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 7d8799ea23..e960342f17 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -3899,6 +3899,321 @@ static void do_test_ListConsumerGroupOffsets(const char *what, SUB_TEST_PASS(); } +static void do_test_UserScramCredentials(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + rd_bool_t null_bytes) { + rd_kafka_event_t *event; + rd_kafka_resp_err_t err; + const rd_kafka_DescribeUserScramCredentials_result_t *describe_result; + const rd_kafka_UserScramCredentialsDescription_t **descriptions; + const rd_kafka_UserScramCredentialsDescription_t *description; + const rd_kafka_AlterUserScramCredentials_result_t *alter_result; + const rd_kafka_AlterUserScramCredentials_result_response_t * + *alter_responses; + const rd_kafka_AlterUserScramCredentials_result_response_t *response; + const rd_kafka_ScramCredentialInfo_t *scram_credential; + rd_kafka_ScramMechanism_t mechanism; + size_t response_cnt; + size_t description_cnt; + size_t num_credentials; + char errstr[512]; + const char *username; + const rd_kafka_error_t *error; + int32_t iterations; + rd_kafka_UserScramCredentialAlteration_t *alterations[1]; + char *salt = tsprintf("%s", "salt"); + size_t salt_size = 4; + char *password = tsprintf("%s", "password"); + size_t password_size = 8; + rd_kafka_queue_t *queue; + const char *users[1]; + users[0] = "testuserforscram"; + + if (null_bytes) { + salt[1] = '\0'; + salt[3] = '\0'; + password[0] = '\0'; + password[3] = '\0'; + } + + SUB_TEST_QUICK("%s, null bytes: %s", what, RD_STR_ToF(null_bytes)); + + queue = useq ? useq : rd_kafka_queue_new(rk); + + rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS); + + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + /* Describe an unknown user */ + rd_kafka_DescribeUserScramCredentials(rk, users, RD_ARRAY_SIZE(users), + options, queue); + rd_kafka_AdminOptions_destroy(options); + event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); + + /* Request level error code should be 0*/ + TEST_CALL_ERR__(rd_kafka_event_error(event)); + err = rd_kafka_event_error(event); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected NO_ERROR, not %s", rd_kafka_err2name(err)); + + describe_result = + rd_kafka_event_DescribeUserScramCredentials_result(event); + descriptions = + rd_kafka_DescribeUserScramCredentials_result_descriptions( + describe_result, &description_cnt); + + /* Assert num_results should be 1 */ + TEST_ASSERT(description_cnt == 1, + "There should be exactly 1 description, got %" PRIusz, + description_cnt); + + description = descriptions[0]; + username = rd_kafka_UserScramCredentialsDescription_user(description); + error = rd_kafka_UserScramCredentialsDescription_error(description); + err = rd_kafka_error_code(error); + + num_credentials = + rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count( + description); + /* username should be the same, err should be RESOURCE_NOT_FOUND + * and num_credentials should be 0 */ + TEST_ASSERT(strcmp(users[0], username) == 0, + "Username should be %s, got %s", users[0], username); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND, + "Error code should be RESOURCE_NOT_FOUND as user " + "does not exist, got %s", + rd_kafka_err2name(err)); + TEST_ASSERT(num_credentials == 0, + "Credentials count should be 0, got %" PRIusz, + num_credentials); + rd_kafka_event_destroy(event); + + /* Create a credential for user 0 */ + mechanism = RD_KAFKA_SCRAM_MECHANISM_SHA_256; + iterations = 10000; + alterations[0] = rd_kafka_UserScramCredentialUpsertion_new( + users[0], mechanism, iterations, (unsigned char *)password, + password_size, (unsigned char *)salt, salt_size); + + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS); + + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + rd_kafka_AlterUserScramCredentials( + rk, alterations, RD_ARRAY_SIZE(alterations), options, queue); + rd_kafka_AdminOptions_destroy(options); + rd_kafka_UserScramCredentialAlteration_destroy_array( + alterations, RD_ARRAY_SIZE(alterations)); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); + err = rd_kafka_event_error(event); +#if !WITH_SSL + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, + "Expected _INVALID_ARG, not %s", rd_kafka_err2name(err)); + rd_kafka_event_destroy(event); + goto final_checks; +#else + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected NO_ERROR, not %s", rd_kafka_err2name(err)); + + alter_result = rd_kafka_event_AlterUserScramCredentials_result(event); + alter_responses = rd_kafka_AlterUserScramCredentials_result_responses( + alter_result, &response_cnt); + + /* response_cnt should be 1*/ + TEST_ASSERT(response_cnt == 1, + "There should be exactly 1 response, got %" PRIusz, + response_cnt); + + response = alter_responses[0]; + username = + rd_kafka_AlterUserScramCredentials_result_response_user(response); + error = + rd_kafka_AlterUserScramCredentials_result_response_error(response); + + err = rd_kafka_error_code(error); + /* username should be the same and err should be NO_ERROR*/ + TEST_ASSERT(strcmp(users[0], username) == 0, + "Username should be %s, got %s", users[0], username); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Error code should be NO_ERROR, got %s", + rd_kafka_err2name(err)); + + rd_kafka_event_destroy(event); +#endif + + /* Credential should be retrieved */ + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS); + + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + rd_kafka_DescribeUserScramCredentials(rk, users, RD_ARRAY_SIZE(users), + options, queue); + rd_kafka_AdminOptions_destroy(options); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); + err = rd_kafka_event_error(event); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected NO_ERROR, not %s", rd_kafka_err2name(err)); + + describe_result = + rd_kafka_event_DescribeUserScramCredentials_result(event); + descriptions = + rd_kafka_DescribeUserScramCredentials_result_descriptions( + describe_result, &description_cnt); + /* Assert description_cnt should be 1 , request level error code should + * be 0*/ + TEST_ASSERT(description_cnt == 1, + "There should be exactly 1 description, got %" PRIusz, + description_cnt); + + description = descriptions[0]; + username = rd_kafka_UserScramCredentialsDescription_user(description); + error = rd_kafka_UserScramCredentialsDescription_error(description); + err = rd_kafka_error_code(error); + + num_credentials = + rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count( + description); + /* username should be the same, err should be NO_ERROR and + * num_credentials should be 1 */ + TEST_ASSERT(strcmp(users[0], username) == 0, + "Username should be %s, got %s", users[0], username); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Error code should be NO_ERROR, got %s", + rd_kafka_err2name(err)); + TEST_ASSERT(num_credentials == 1, + "Credentials count should be 1, got %" PRIusz, + num_credentials); + + scram_credential = + rd_kafka_UserScramCredentialsDescription_scramcredentialinfo( + description, 0); + mechanism = rd_kafka_ScramCredentialInfo_mechanism(scram_credential); + iterations = rd_kafka_ScramCredentialInfo_iterations(scram_credential); + /* mechanism should be SHA 256 and iterations 10000 */ + TEST_ASSERT(mechanism == RD_KAFKA_SCRAM_MECHANISM_SHA_256, + "Mechanism should be %d, got: %d", + RD_KAFKA_SCRAM_MECHANISM_SHA_256, mechanism); + TEST_ASSERT(iterations == 10000, + "Iterations should be 10000, got %" PRId32, iterations); + + rd_kafka_event_destroy(event); + + /* Delete the credential */ + alterations[0] = + rd_kafka_UserScramCredentialDeletion_new(users[0], mechanism); + + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS); + + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + rd_kafka_AlterUserScramCredentials( + rk, alterations, RD_ARRAY_SIZE(alterations), options, queue); + rd_kafka_AdminOptions_destroy(options); + rd_kafka_UserScramCredentialAlteration_destroy_array( + alterations, RD_ARRAY_SIZE(alterations)); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); + err = rd_kafka_event_error(event); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected NO_ERROR, not %s", rd_kafka_err2name(err)); + + alter_result = rd_kafka_event_AlterUserScramCredentials_result(event); + alter_responses = rd_kafka_AlterUserScramCredentials_result_responses( + alter_result, &response_cnt); + + /* response_cnt should be 1*/ + TEST_ASSERT(response_cnt == 1, + "There should be exactly 1 response, got %" PRIusz, + response_cnt); + + response = alter_responses[0]; + username = + rd_kafka_AlterUserScramCredentials_result_response_user(response); + error = + rd_kafka_AlterUserScramCredentials_result_response_error(response); + + err = rd_kafka_error_code(error); + /* username should be the same and err should be NO_ERROR*/ + TEST_ASSERT(strcmp(users[0], username) == 0, + "Username should be %s, got %s", users[0], username); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Error code should be NO_ERROR, got %s", + rd_kafka_err2name(err)); + + rd_kafka_event_destroy(event); + +#if !WITH_SSL +final_checks: +#endif + + /* Credential doesn't exist anymore for this user */ + + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS); + + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + rd_kafka_DescribeUserScramCredentials(rk, users, RD_ARRAY_SIZE(users), + options, queue); + rd_kafka_AdminOptions_destroy(options); + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); + err = rd_kafka_event_error(event); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected NO_ERROR, not %s", rd_kafka_err2name(err)); + + describe_result = + rd_kafka_event_DescribeUserScramCredentials_result(event); + descriptions = + rd_kafka_DescribeUserScramCredentials_result_descriptions( + describe_result, &description_cnt); + /* Assert description_cnt should be 1, request level error code should + * be 0*/ + TEST_ASSERT(description_cnt == 1, + "There should be exactly 1 description, got %" PRIusz, + description_cnt); + + description = descriptions[0]; + username = rd_kafka_UserScramCredentialsDescription_user(description); + error = rd_kafka_UserScramCredentialsDescription_error(description); + err = rd_kafka_error_code(error); + num_credentials = + rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count( + description); + /* username should be the same, err should be RESOURCE_NOT_FOUND + * and num_credentials should be 0 */ + TEST_ASSERT(strcmp(users[0], username) == 0, + "Username should be %s, got %s", users[0], username); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND, + "Error code should be RESOURCE_NOT_FOUND, got %s", + rd_kafka_err2name(err)); + TEST_ASSERT(num_credentials == 0, + "Credentials count should be 0, got %" PRIusz, + num_credentials); + + rd_kafka_event_destroy(event); + + if (!useq) + rd_kafka_queue_destroy(queue); + + SUB_TEST_PASS(); +} + static void do_test_apis(rd_kafka_type_t cltype) { rd_kafka_t *rk; rd_kafka_conf_t *conf; @@ -4026,6 +4341,12 @@ static void do_test_apis(rd_kafka_type_t cltype) { rd_true /*with subscribing consumer*/, rd_true); } + if (test_broker_version >= TEST_BRKVER(2, 7, 0, 0)) { + do_test_UserScramCredentials("main queue", rk, mainq, rd_false); + do_test_UserScramCredentials("temp queue", rk, NULL, rd_false); + do_test_UserScramCredentials("main queue", rk, mainq, rd_true); + } + rd_kafka_queue_destroy(mainq); rd_kafka_destroy(rk); @@ -4037,7 +4358,6 @@ static void do_test_apis(rd_kafka_type_t cltype) { int main_0081_admin(int argc, char **argv) { do_test_apis(RD_KAFKA_PRODUCER); - if (test_quick) { TEST_SAY("Skipping further 0081 tests due to quick mode\n"); return 0;