From 56e8cc7a5e8a34ed1a27c8073490e76493ecda86 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Tue, 15 Oct 2024 10:05:09 -0400 Subject: [PATCH] Add schema registry client function (#1565) --- go.mod | 7 +++-- go.sum | 14 +++++---- pkg/redpanda/client.go | 65 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 76 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index b6853157b3..ecdf9d45a8 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,8 @@ require ( github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go/modules/k3s v0.32.0 - github.com/twmb/franz-go v1.15.4 + github.com/twmb/franz-go v1.18.0 + github.com/twmb/franz-go/pkg/sr v1.2.0 github.com/wk8/go-ordered-map/v2 v2.1.8 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f @@ -152,7 +153,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect - github.com/pierrec/lz4/v4 v4.1.19 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_golang v1.19.0 // indirect @@ -178,7 +179,7 @@ require ( github.com/texttheater/golang-levenshtein v1.0.1 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect github.com/twmb/tlscfg v1.2.1 // indirect github.com/virtuald/go-ordered-json v0.0.0-20170621173500-b18e6e673d74 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect diff --git a/go.sum b/go.sum index 626434654e..8f681ace7b 100644 --- a/go.sum +++ b/go.sum @@ -519,8 +519,8 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI= github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4= -github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -647,10 +647,12 @@ github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5I github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY= -github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw= -github.com/twmb/franz-go v1.15.4/go.mod h1:rC18hqNmfo8TMc1kz7CQmHL74PLNF8KVvhflxiiJZCU= -github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= -github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= +github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= +github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= +github.com/twmb/franz-go/pkg/sr v1.2.0 h1:zYr0Ly7KLFfeCGaSr8teN6LvAVeYVrZoUsyyPHTYB+M= +github.com/twmb/franz-go/pkg/sr v1.2.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw= github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44= github.com/twmb/tlscfg v1.2.1/go.mod h1:GameEQddljI+8Es373JfQEBvtI4dCTLKWGJbqT2kErs= github.com/virtuald/go-ordered-json v0.0.0-20170621173500-b18e6e673d74 h1:JwtAtbp7r/7QSyGz8mKUbYJBg2+6Cd7OjM8o/GNOcVo= diff --git a/pkg/redpanda/client.go b/pkg/redpanda/client.go index f717a58a91..1c082e439e 100644 --- a/pkg/redpanda/client.go +++ b/pkg/redpanda/client.go @@ -8,14 +8,17 @@ import ( "errors" "fmt" "net" + "net/http" "slices" "strings" + "time" "github.com/redpanda-data/helm-charts/charts/redpanda" "github.com/redpanda-data/helm-charts/pkg/gotohelm/helmette" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl" "github.com/twmb/franz-go/pkg/sasl/scram" + "github.com/twmb/franz-go/pkg/sr" "github.com/redpanda-data/common-go/rpadmin" @@ -56,7 +59,7 @@ func AdminClient(dot *helmette.Dot, dialer DialContextFunc) (*rpadmin.AdminAPI, if redpanda.TLSEnabled(dot) { prefix = "https://" - tlsConfig, err = tlsConfigFromDot(dot, values.Listeners.Kafka.TLS.Cert) + tlsConfig, err = tlsConfigFromDot(dot, values.Listeners.Admin.TLS.Cert) if err != nil { return nil, err } @@ -82,6 +85,66 @@ func AdminClient(dot *helmette.Dot, dialer DialContextFunc) (*rpadmin.AdminAPI, return rpadmin.NewAdminAPIWithDialer(hosts, auth, tlsConfig, dialer) } +// SchemaRegistryClient creates a client to talk to a Redpanda cluster admin API based on its helm +// configuration over its internal listeners. +func SchemaRegistryClient(dot *helmette.Dot, dialer DialContextFunc, opts ...sr.ClientOpt) (*sr.Client, error) { + values := helmette.Unwrap[redpanda.Values](dot.Values) + name := redpanda.Fullname(dot) + domain := redpanda.InternalDomain(dot) + prefix := "http://" + + // These transport values come from the TLS client options found here: + // https://github.com/twmb/franz-go/blob/cea7aa5d803781e5f0162187795482ba1990c729/pkg/sr/clientopt.go#L48-L68 + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + DialContext: dialer, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + + if dialer == nil { + transport.DialContext = (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext + } + + if redpanda.TLSEnabled(dot) { + prefix = "https://" + + tlsConfig, err := tlsConfigFromDot(dot, values.Listeners.SchemaRegistry.TLS.Cert) + if err != nil { + return nil, err + } + transport.TLSClientConfig = tlsConfig + } + + copts := []sr.ClientOpt{sr.HTTPClient(&http.Client{ + Timeout: 5 * time.Second, + Transport: transport, + })} + + username, password, _, err := authFromDot(dot) + if err != nil { + return nil, err + } + + if username != "" { + copts = append(copts, sr.BasicAuth(username, password)) + } + + hosts := redpanda.ServerList(values.Statefulset.Replicas, prefix, name, domain, values.Listeners.SchemaRegistry.Port) + copts = append(copts, sr.URLs(hosts...)) + + // finally, override any calculated client opts with whatever was + // passed in + return sr.NewClient(append(copts, opts...)...) +} + // KafkaClient creates a client to talk to a Redpanda cluster based on its helm // configuration over its internal listeners. func KafkaClient(dot *helmette.Dot, dialer DialContextFunc, opts ...kgo.Opt) (*kgo.Client, error) {