diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index 61b3555f732ad..b516e01923c58 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -410,16 +410,20 @@ public boolean handleResponse(FetchResponse response) { return false; } if (nextMetadata.isFull()) { - if (response.throttleTimeMs() > 0) { - // [LIKAFKA-59133] To avoid stuck consumer, we made a server side change to return valid fetch responses - // even when the request is throttled. To honor the server side change, we log the throttling and still - // handle the fetch response. + if (response.responseData().isEmpty() && response.throttleTimeMs() > 0) { + // Normally, an empty full fetch response would be invalid. However, KIP-219 + // specifies that if the broker wants to throttle the client, it will respond + // to a full fetch request with an empty response and a throttleTimeMs + // value set. We don't want to log this with a warning, since it's not an error. + // However, the empty full fetch response can't be processed, so it's still appropriate + // to return false here. if (log.isDebugEnabled()) { - log.debug("Node {} sent a response indicate the request is throttled for {} ms.", node, - response.throttleTimeMs()); + log.debug("Node {} sent a empty full fetch response to indicate that this " + + "client should be throttled for {} ms.", node, response.throttleTimeMs()); } + nextMetadata = FetchMetadata.INITIAL; + return false; } - String problem = verifyFullFetchResponsePartitions(response); if (problem != null) { log.info("Node {} sent an invalid full fetch response with {}", node, problem); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9a71e7884f5db..9fb4a74cd416e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -990,23 +990,24 @@ class KafkaApis(val requestChannel: RequestChannel, val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs) val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs) - - // [LIKAFKA-59133] We made a change here to actually fill in the data to the fetch response even when throttling happens. - // This prevents the consumers completely getting stuck when throttling happens intensively. - unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) - // [LIKAFKA-45345] even if the throttleTimeMs is 0, we still record it so that // the throttle-time sensor does not expire before the byte-rate sensor in quotas.fetch // or the request-time sensor in quotas.request. val (effectiveBandwidthThrottleTime, effectiveRequestThrottleTime) = if (maxThrottleTimeMs > 0) { request.apiThrottleTimeMs = maxThrottleTimeMs - + // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value + // from the fetch quota because we are going to return an empty response. + quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs) + // If throttling is required, return an empty response. + unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs) if (bandwidthThrottleTimeMs > requestThrottleTimeMs) { (bandwidthThrottleTimeMs, 0) } else { (0, requestThrottleTimeMs) } } else { + // Get the actual response. This will update the fetch context. + unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) trace(s"Sending Fetch response with partitions.size=$responseSize, metadata=${unconvertedFetchResponse.sessionId}") (0, 0) }