From 961946e55fb3f89eb782d4011af4bf5cd3c31f17 Mon Sep 17 00:00:00 2001
From: Anchit Jain <112778471+anchitj@users.noreply.github.com>
Date: Fri, 7 Jul 2023 16:27:01 +0530
Subject: [PATCH] Add KIP-235 implementation (#4292)
Add DNS alias support for secured connection, needed
for Kerberos SASL authentication.
---
CHANGELOG.md | 2 +
CONFIGURATION.md | 1 +
INTRODUCTION.md | 2 +-
src/rdaddr.h | 2 +-
src/rdkafka.c | 3 +-
src/rdkafka_broker.c | 88 +++++++++++++++++++++++++++++++++++---------
src/rdkafka_broker.h | 4 +-
src/rdkafka_conf.c | 13 +++++++
src/rdkafka_conf.h | 6 +++
tests/0004-conf.c | 2 +
10 files changed, 102 insertions(+), 21 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 87005fc0d7..b39a7249ae 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,6 +21,8 @@ librdkafka v2.2.0 is a feature release:
closes as normal ones (#4294).
* Added `fetch.queue.backoff.ms` to the consumer to control how long
the consumer backs off next fetch attempt. (@bitemyapp, @edenhill, #2879)
+ * [KIP-235](https://cwiki.apache.org/confluence/display/KAFKA/KIP-235%3A+Add+DNS+alias+support+for+secured+connection):
+ Add DNS alias support for secured connection (#4292).
## Enhancements
diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index 9a0e7ab4c7..127fe4c88f 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -152,6 +152,7 @@ delivery.report.only.error | P | true, false | false
dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb())
*Type: see dedicated API*
dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
*Type: see dedicated API*
sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
*Type: integer*
+client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. NOTE: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname.
*Type: enum value*
## Topic configuration properties
diff --git a/INTRODUCTION.md b/INTRODUCTION.md
index 101a338b55..32d42bd1aa 100644
--- a/INTRODUCTION.md
+++ b/INTRODUCTION.md
@@ -1900,7 +1900,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-226 - AdminAPI: Dynamic broker config | 1.1.0 | Supported |
| KIP-227 - Consumer Incremental Fetch | 1.1.0 | Not supported |
| KIP-229 - AdminAPI: DeleteGroups | 1.1.0 | Supported |
-| KIP-235 - DNS alias for secure connections | 2.1.0 | Not supported |
+| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported |
| KIP-249 - AdminAPI: Deletegation Tokens | 2.0.0 | Not supported |
| KIP-255 - SASL OAUTHBEARER | 2.0.0 | Supported |
| KIP-266 - Fix indefinite consumer timeouts | 2.0.0 | Supported (bound by session.timeout.ms and max.poll.interval.ms) |
diff --git a/src/rdaddr.h b/src/rdaddr.h
index 0c407a2969..7e86a549a8 100644
--- a/src/rdaddr.h
+++ b/src/rdaddr.h
@@ -139,7 +139,7 @@ rd_sockaddr_list_next(rd_sockaddr_list_t *rsal) {
#define RD_SOCKADDR_LIST_FOREACH(sinx, rsal) \
for ((sinx) = &(rsal)->rsal_addr[0]; \
- (sinx) < &(rsal)->rsal_addr[(rsal)->rsal_len]; (sinx)++)
+ (sinx) < &(rsal)->rsal_addr[(rsal)->rsal_cnt]; (sinx)++)
/**
* Wrapper for getaddrinfo(3) that performs these additional tasks:
diff --git a/src/rdkafka.c b/src/rdkafka.c
index 0311285587..4f37ecc974 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -2524,7 +2524,8 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
/* Add initial list of brokers from configuration */
if (rk->rk_conf.brokerlist) {
- if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
+ if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist,
+ rd_true) == 0)
rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
"No brokers configured");
}
diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c
index da6a2b1803..481c21d9c5 100644
--- a/src/rdkafka_broker.c
+++ b/src/rdkafka_broker.c
@@ -50,6 +50,7 @@
#include
#include "rd.h"
+#include "rdaddr.h"
#include "rdkafka_int.h"
#include "rdkafka_msg.h"
#include "rdkafka_msgset.h"
@@ -5257,6 +5258,31 @@ static int rd_kafka_broker_name_parse(rd_kafka_t *rk,
return 0;
}
+/**
+ * @brief Add a broker from a string of type "[proto://]host[:port]" to the list
+ * of brokers. *cnt is increased by one if a broker was added, else not.
+ */
+static void rd_kafka_find_or_add_broker(rd_kafka_t *rk,
+ rd_kafka_secproto_t proto,
+ const char *host,
+ uint16_t port,
+ int *cnt) {
+ rd_kafka_broker_t *rkb = NULL;
+
+ if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&
+ rkb->rkb_source == RD_KAFKA_CONFIGURED) {
+ (*cnt)++;
+ } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, host,
+ port, RD_KAFKA_NODEID_UA) != NULL)
+ (*cnt)++;
+
+ /* If rd_kafka_broker_find returned a broker its
+ * reference needs to be released
+ * See issue #193 */
+ if (rkb)
+ rd_kafka_broker_destroy(rkb);
+}
+
/**
* @brief Adds a (csv list of) broker(s).
* Returns the number of brokers succesfully added.
@@ -5264,17 +5290,22 @@ static int rd_kafka_broker_name_parse(rd_kafka_t *rk,
* @locality any thread
* @locks none
*/
-int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) {
+int rd_kafka_brokers_add0(rd_kafka_t *rk,
+ const char *brokerlist,
+ rd_bool_t is_bootstrap_server_list) {
char *s_copy = rd_strdup(brokerlist);
char *s = s_copy;
int cnt = 0;
- rd_kafka_broker_t *rkb;
- int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
+ int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
+ rd_sockaddr_inx_t *sinx;
+ rd_sockaddr_list_t *sockaddr_list;
/* Parse comma-separated list of brokers. */
while (*s) {
uint16_t port;
const char *host;
+ const char *err_str;
+ const char *resolved_FQDN;
rd_kafka_secproto_t proto;
if (*s == ',' || *s == ' ') {
@@ -5287,20 +5318,43 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) {
break;
rd_kafka_wrlock(rk);
+ if (is_bootstrap_server_list &&
+ rk->rk_conf.client_dns_lookup ==
+ RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
+ rd_kafka_dbg(rk, ALL, "INIT",
+ "Canonicalizing bootstrap broker %s:%d",
+ host, port);
+ sockaddr_list = rd_getaddrinfo(
+ host, RD_KAFKA_PORT_STR, AI_ADDRCONFIG,
+ rk->rk_conf.broker_addr_family, SOCK_STREAM,
+ IPPROTO_TCP, rk->rk_conf.resolve_cb,
+ rk->rk_conf.opaque, &err_str);
+
+ if (!sockaddr_list) {
+ rd_kafka_log(rk, LOG_WARNING, "BROKER",
+ "Failed to resolve '%s': %s", host,
+ err_str);
+ rd_kafka_wrunlock(rk);
+ continue;
+ }
- if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&
- rkb->rkb_source == RD_KAFKA_CONFIGURED) {
- cnt++;
- } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto,
- host, port,
- RD_KAFKA_NODEID_UA) != NULL)
- cnt++;
-
- /* If rd_kafka_broker_find returned a broker its
- * reference needs to be released
- * See issue #193 */
- if (rkb)
- rd_kafka_broker_destroy(rkb);
+ RD_SOCKADDR_LIST_FOREACH(sinx, sockaddr_list) {
+ resolved_FQDN = rd_sockaddr2str(
+ sinx, RD_SOCKADDR2STR_F_RESOLVE);
+ rd_kafka_dbg(
+ rk, ALL, "INIT",
+ "Adding broker with resolved hostname %s",
+ resolved_FQDN);
+
+ rd_kafka_find_or_add_broker(
+ rk, proto, resolved_FQDN, port, &cnt);
+ };
+
+ rd_sockaddr_list_destroy(sockaddr_list);
+ } else {
+ rd_kafka_find_or_add_broker(rk, proto, host, port,
+ &cnt);
+ }
rd_kafka_wrunlock(rk);
}
@@ -5322,7 +5376,7 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) {
int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) {
- return rd_kafka_brokers_add0(rk, brokerlist);
+ return rd_kafka_brokers_add0(rk, brokerlist, rd_false);
}
diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h
index 1e03dba850..30f66b25c9 100644
--- a/src/rdkafka_broker.h
+++ b/src/rdkafka_broker.h
@@ -469,7 +469,9 @@ rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk,
int state,
rd_kafka_enq_once_t *eonce);
-int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist);
+int rd_kafka_brokers_add0(rd_kafka_t *rk,
+ const char *brokerlist,
+ rd_bool_t is_bootstrap_server_list);
void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state);
void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index 285c8e4458..9200af4c6a 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -1439,6 +1439,19 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"A higher value allows for more effective batching of these "
"messages.",
0, 900000, 10},
+ {_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup),
+ "Controls how the client uses DNS lookups. By default, when the lookup "
+ "returns multiple IP addresses for a hostname, they will all be attempted "
+ "for connection before the connection is considered failed. This applies "
+ "to both bootstrap and advertised servers. If the value is set to "
+ "`resolve_canonical_bootstrap_servers_only`, each entry will be resolved "
+ "and expanded into a list of canonical names. NOTE: Default here is "
+ "different from the Java client's default behavior, which connects only "
+ "to the first IP address returned for a hostname. ",
+ .vdef = RD_KAFKA_USE_ALL_DNS_IPS,
+ .s2i = {{RD_KAFKA_USE_ALL_DNS_IPS, "use_all_dns_ips"},
+ {RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
+ "resolve_canonical_bootstrap_servers_only"}}},
/*
diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h
index 6a79515c2a..01b6258d2e 100644
--- a/src/rdkafka_conf.h
+++ b/src/rdkafka_conf.h
@@ -158,6 +158,11 @@ typedef enum {
RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */
} rd_kafka_ssl_endpoint_id_t;
+typedef enum {
+ RD_KAFKA_USE_ALL_DNS_IPS,
+ RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
+} rd_kafka_client_dns_lookup_t;
+
/* Increase in steps of 64 as needed.
* This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33)
@@ -224,6 +229,7 @@ struct rd_kafka_conf_s {
int api_version_fallback_ms;
char *broker_version_fallback;
rd_kafka_secproto_t security_protocol;
+ rd_kafka_client_dns_lookup_t client_dns_lookup;
struct {
#if WITH_SSL
diff --git a/tests/0004-conf.c b/tests/0004-conf.c
index b5f293921e..5dbd9f0b1d 100644
--- a/tests/0004-conf.c
+++ b/tests/0004-conf.c
@@ -529,6 +529,8 @@ int main_0004_conf(int argc, char **argv) {
"ssl.ca.certificate.stores",
"Intermediate ,, Root ,",
#endif
+ "client.dns.lookup",
+ "resolve_canonical_bootstrap_servers_only",
NULL
};
static const char *tconfs[] = {"request.required.acks",