Skip to content

Commit

Permalink
[KIP-848] ListGroups filter to list only given group types (#4860)
Browse files Browse the repository at this point in the history
Co-authored-by: mahajanadhitya <amahajan@confluent.io>
  • Loading branch information
emasab and mahajanadhitya authored Oct 4, 2024
1 parent 77622e2 commit 25db856
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ librdkafka v2.6.0 is a feature release:

* [KIP-714] Complete consumer metrics support (#4808).
* [KIP-714] Produce latency average and maximum metrics support for parity with Java client (#4847).
* [KIP-848] ListConsumerGroups Admin API now has an optional filter to return only groups
of given types.
* Fix for permanent fetch errors when using a newer Fetch RPC version with an older
inter broker protocol (#4806).

Expand Down
57 changes: 46 additions & 11 deletions examples/list_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ static void usage(const char *reason, ...) {
fprintf(stderr,
"List groups usage examples\n"
"\n"
"Usage: %s <options> <state1> <state2> ...\n"
"Usage: %s <options> <state_cnt> [<state1> <state2> ...] "
"<type_cnt> [<type1> <type2> ...]\n"
"\n"
"Options:\n"
" -b <brokers> Bootstrap server list to connect to.\n"
Expand Down Expand Up @@ -145,12 +146,15 @@ static int print_groups_info(const rd_kafka_ListConsumerGroups_result_t *list) {
int is_simple_consumer_group =
rd_kafka_ConsumerGroupListing_is_simple_consumer_group(
group);
rd_kafka_consumer_group_type_t type =
rd_kafka_ConsumerGroupListing_type(group);

printf("Group \"%s\", is simple %" PRId32
", "
"state %s",
"state %s, type %s",
group_id, is_simple_consumer_group,
rd_kafka_consumer_group_state_name(state));
rd_kafka_consumer_group_state_name(state),
rd_kafka_consumer_group_type_name(type));
printf("\n");
}
for (i = 0; i < result_error_cnt; i++) {
Expand Down Expand Up @@ -184,24 +188,46 @@ int64_t parse_int(const char *what, const char *str) {
static void
cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
rd_kafka_t *rk;
const char **states_str = NULL;
char errstr[512];
rd_kafka_AdminOptions_t *options;
rd_kafka_event_t *event = NULL;
rd_kafka_error_t *error = NULL;
int i;
int retval = 0;
int states_cnt = 0;
int retval = 0;
int states_cnt = 0;
int types_cnt = 0;
const int min_argc = 2;

rd_kafka_consumer_group_state_t *states;
rd_kafka_consumer_group_type_t *types;

/*
* Argument validation
*/
if (argc < min_argc)
usage("Expected at least %d arguments", min_argc);
else {
states_cnt = parse_int("state count", argv[0]);
if (argc < states_cnt + 2) {
usage("Expected %d state code(s) after states count",
states_cnt);
}

if (argc >= 1) {
states_str = (const char **)&argv[0];
states_cnt = argc;
types_cnt = parse_int("type count", argv[1 + states_cnt]);
if (argc < 1 + states_cnt + 1 + types_cnt) {
usage("Expected %d type(s) after type count",
types_cnt);
}
}

states = calloc(states_cnt, sizeof(rd_kafka_consumer_group_state_t));
for (i = 0; i < states_cnt; i++) {
states[i] = parse_int("state code", states_str[i]);
states[i] = parse_int("state code", argv[i + 1]);
}

types = calloc(types_cnt, sizeof(rd_kafka_consumer_group_type_t));
for (i = 0; i < types_cnt; i++) {
types[i] = parse_int("type code", argv[i + states_cnt + 2]);
}

/*
Expand Down Expand Up @@ -235,10 +261,17 @@ cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
options, states, states_cnt))) {
fprintf(stderr, "%% Failed to set states: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
goto exit;
}
free(states);
if ((error = rd_kafka_AdminOptions_set_match_consumer_group_types(
options, types, types_cnt))) {
fprintf(stderr, "%% Failed to set types: %s\n",
rd_kafka_error_string(error));
goto exit;
}
free(types);


rd_kafka_ListConsumerGroups(rk, options, queue);
rd_kafka_AdminOptions_destroy(options);
Expand Down Expand Up @@ -273,6 +306,8 @@ cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {


exit:
if (error)
rd_kafka_error_destroy(error);
if (event)
rd_kafka_event_destroy(event);
rd_kafka_queue_destroy(queue);
Expand Down
22 changes: 21 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4737,6 +4737,26 @@ rd_kafka_consumer_group_state_code(const char *name) {
return RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN;
}

static const char *rd_kafka_consumer_group_type_names[] = {
"Unknown", "Consumer", "Classic"};

const char *
rd_kafka_consumer_group_type_name(rd_kafka_consumer_group_type_t type) {
if (type < 0 || type >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT)
return NULL;
return rd_kafka_consumer_group_type_names[type];
}

rd_kafka_consumer_group_type_t
rd_kafka_consumer_group_type_code(const char *name) {
size_t i;
for (i = 0; i < RD_KAFKA_CONSUMER_GROUP_TYPE__CNT; i++) {
if (!rd_strcasecmp(rd_kafka_consumer_group_type_names[i], name))
return i;
}
return RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN;
}

static void rd_kafka_DescribeGroups_resp_cb(rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
Expand Down Expand Up @@ -5002,7 +5022,7 @@ rd_kafka_list_groups(rd_kafka_t *rk,
state.wait_cnt++;
rkb_cnt++;
error = rd_kafka_ListGroupsRequest(
rkb, 0, NULL, 0, RD_KAFKA_REPLYQ(state.q, 0),
rkb, 0, NULL, 0, NULL, 0, RD_KAFKA_REPLYQ(state.q, 0),
rd_kafka_ListGroups_resp_cb, &state);
if (error) {
rd_kafka_ListGroups_resp_cb(rk, rkb,
Expand Down
65 changes: 65 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -5158,6 +5158,18 @@ typedef enum {
RD_KAFKA_CONSUMER_GROUP_STATE__CNT
} rd_kafka_consumer_group_state_t;

/**
* @enum rd_kafka_consumer_group_type_t
*
* @brief Consumer group type.
*/
typedef enum {
RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN = 0,
RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER = 1,
RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC = 2,
RD_KAFKA_CONSUMER_GROUP_TYPE__CNT
} rd_kafka_consumer_group_type_t;

/**
* @brief Group information
*/
Expand Down Expand Up @@ -5242,6 +5254,30 @@ RD_EXPORT
rd_kafka_consumer_group_state_t
rd_kafka_consumer_group_state_code(const char *name);

/**
* @brief Returns a name for a group type code.
*
* @param type The group type value.
*
* @return The group type name corresponding to the provided group type value.
*/
RD_EXPORT
const char *
rd_kafka_consumer_group_type_name(rd_kafka_consumer_group_type_t type);

/**
* @brief Returns a code for a group type name.
*
* @param name The group type name.
*
* @remark The comparison is case-insensitive.
*
* @return The group type value corresponding to the provided group type name.
*/
RD_EXPORT
rd_kafka_consumer_group_type_t
rd_kafka_consumer_group_type_code(const char *name);

/**
* @brief Release list memory
*/
Expand Down Expand Up @@ -7208,6 +7244,24 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_match_consumer_group_states(
const rd_kafka_consumer_group_state_t *consumer_group_states,
size_t consumer_group_states_cnt);

/**
* @brief Set consumer groups types to query for.
*
* @param options Admin options.
* @param consumer_group_types Array of consumer group types.
* @param consumer_group_types_cnt Size of the \p consumer_group_types array.
*
* @return NULL on success, a new error instance that must be
* released with rd_kafka_error_destroy() in case of error.
*
* @remark This option is valid for ListConsumerGroups.
*/
RD_EXPORT
rd_kafka_error_t *rd_kafka_AdminOptions_set_match_consumer_group_types(
rd_kafka_AdminOptions_t *options,
const rd_kafka_consumer_group_type_t *consumer_group_types,
size_t consumer_group_types_cnt);

/**
* @brief Set Isolation Level to an allowed `rd_kafka_IsolationLevel_t` value.
*/
Expand Down Expand Up @@ -8532,6 +8586,17 @@ RD_EXPORT
rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupListing_state(
const rd_kafka_ConsumerGroupListing_t *grplist);

/**
* @brief Gets type for the \p grplist group.
*
* @param grplist The group listing.
*
* @return A group type.
*/
RD_EXPORT
rd_kafka_consumer_group_type_t rd_kafka_ConsumerGroupListing_type(
const rd_kafka_ConsumerGroupListing_t *grplist);

/**
* @brief Get an array of valid list groups from a ListConsumerGroups result.
*
Expand Down
Loading

0 comments on commit 25db856

Please sign in to comment.