Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit api versions and metadata v13 #184

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proxy/processor_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ
return false, nil, err
}

// Update proxy/protocol/responses.go apiKeyProduceMaxVersion when adding new Produce version support
case 3, 4, 5, 6, 7, 8, 9, 10, 11:
// CorrelationID + ClientID
if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
Expand Down
150 changes: 149 additions & 1 deletion proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,28 @@ package protocol
import (
"errors"
"fmt"
"math"

"github.com/grepplabs/kafka-proxy/config"
)

const (
apiKeyProduce = 0
apiKeyMetadata = 3
apiKeyFindCoordinator = 10
apiKeyApiVersions = 18

// Update ApiVersions response to prevent requests/responses that can't be parsed by Kafka-Proxy
apiKeyApiVersionsMaxVersion = 4
apiKeyMetadataMaxVersion = 13
apiKeyFindCoordinatorMaxVersion = 6
// produce requests are parsed by proxy/processor_default.go mustReply()
apiKeyProduceMaxVersion = 11

brokersKeyName = "brokers"
hostKeyName = "host"
portKeyName = "port"
apiKeysKeyname = "api_keys"

coordinatorKeyName = "coordinator"
coordinatorsKeyName = "coordinators"
Expand All @@ -22,6 +33,7 @@ const (
var (
metadataResponseSchemaVersions = createMetadataResponseSchemaVersions()
findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions()
apiVersionsResponseSchemaVersions = createApiVersionsResponseSchemaVersions()
)

func createMetadataResponseSchemaVersions() []Schema {
Expand Down Expand Up @@ -243,7 +255,33 @@ func createMetadataResponseSchemaVersions() []Schema {
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12}
metadataResponseV13 := NewSchema("metadata_response_v13",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9},
&Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12},
&Mfield{Name: "error_code", Ty: TypeInt16},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

// Update apiKeyMetadataMaxVersion when adding new versions
return []Schema{
metadataResponseV0,
metadataResponseV1,
metadataResponseV2,
metadataResponseV3,
metadataResponseV4,
metadataResponseV5,
metadataResponseV6,
metadataResponseV7,
metadataResponseV8,
metadataResponseV9,
metadataResponseV10,
metadataResponseV11,
metadataResponseV12,
metadataResponseV13,
}
}

func createFindCoordinatorResponseSchemaVersions() []Schema {
Expand Down Expand Up @@ -296,9 +334,117 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
findCoordinatorResponseV5 := findCoordinatorResponseV4
findCoordinatorResponseV6 := findCoordinatorResponseV5

// Update apiKeyFindCoordinatorMaxVersion when adding new versions
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6}
}

func createApiVersionsResponseSchemaVersions() []Schema {
apiVersionKeyV0 := NewSchema("api_versions_key_v0",
&Mfield{Name: "api_key", Ty: TypeInt16},
&Mfield{Name: "min_version", Ty: TypeInt16},
&Mfield{Name: "max_version", Ty: TypeInt16},
)

apiVersionSchemaV3 := NewSchema("api_versions_key_schema3",
&Mfield{Name: "api_key", Ty: TypeInt16},
&Mfield{Name: "min_version", Ty: TypeInt16},
&Mfield{Name: "max_version", Ty: TypeInt16},
&SchemaTaggedFields{"api_versions_tagged_fields"},
)

apiVersionsResponseV0 := NewSchema("api_versions_response_v0",
&Mfield{Name: "error_code", Ty: TypeInt16},
&Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0},
)

// Version 1 adds throttle time to the response.
apiVersionsResponseV1 := NewSchema("api_versions_response_v1",
&Mfield{Name: "error_code", Ty: TypeInt16},
&Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0},
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
)

// Starting in version 2, on quota violation, brokers send out responses before throttling.
apiVersionsResponseV2 := apiVersionsResponseV1

// Version 3 is the first flexible version. Tagged fields are only supported in the body but
// not in the header. The length of the header must not change in order to guarantee the
// backward compatibility.
//
// Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported
// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
apiVersionsResponseV3 := NewSchema("api_versions_response_v3",
&Mfield{Name: "error_code", Ty: TypeInt16},
&CompactArray{Name: apiKeysKeyname, Ty: apiVersionSchemaV3},
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0.
apiVersionsResponseV4 := apiVersionsResponseV3

// Update apiKeyApiVersionsMaxVersion when adding new versions
return []Schema{
apiVersionsResponseV0,
apiVersionsResponseV1,
apiVersionsResponseV2,
apiVersionsResponseV3,
apiVersionsResponseV4,
}
}

func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
}
if fn == nil {
return errors.New("net address mapper must not be nil")
}
apiVersionsArray, ok := decodedStruct.Get(apiKeysKeyname).([]interface{})
if !ok {
return errors.New("api versions not found")
}
for _, apiVersionElement := range apiVersionsArray {
apiVersion := apiVersionElement.(*Struct)
apiKey, ok := apiVersion.Get("api_key").(int16)
if !ok {
return errors.New("api_keys.api_key not found")
}
maxVersion, ok := apiVersion.Get("max_version").(int16)
if !ok {
return errors.New("api_keys.max_version not found")
}

limitVersion := int16(math.MaxInt16)
switch apiKey {
case apiKeyProduce:
if maxVersion > apiKeyProduceMaxVersion {
limitVersion = apiKeyProduceMaxVersion
}
case apiKeyMetadata:
if maxVersion > apiKeyMetadataMaxVersion {
limitVersion = apiKeyMetadataMaxVersion
}
case apiKeyFindCoordinator:
if maxVersion > apiKeyFindCoordinatorMaxVersion {
limitVersion = apiKeyFindCoordinatorMaxVersion
}
case apiKeyApiVersions:
if maxVersion > apiKeyApiVersionsMaxVersion {
limitVersion = apiKeyApiVersionsMaxVersion
}
}
if maxVersion > limitVersion {
err := apiVersion.Replace("max_version", limitVersion)
if err != nil {
return err
}
}
}

return nil
}

func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
Expand Down Expand Up @@ -437,6 +583,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse)
case apiKeyFindCoordinator:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse)
case apiKeyApiVersions:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, apiVersionsResponseSchemaVersions, modifyApiVersionsResponse)
default:
return nil, nil
}
Expand Down