Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): Add OAUTH support for Confluent Cloud Kafka #5100

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ scheduler/mlrepo
scheduler/mnt
scheduler/notebooks
scheduler/venv

# General
go.work
go.work.sum

11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ undeploy-k8s:
# Dev
#


init-go-modules:
go work init || echo "go modules already initialized"
go work use operator
go work use scheduler
go work use apis/go
go work use components/tls
go work use hodometer
go work use tests/integration


# use -W option for warnings as errors
docs_build_html:
cd docs && \
Expand Down
2 changes: 1 addition & 1 deletion components/tls/pkg/password/k8s_secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *PasswordSecretHandler) onUpdate(oldObj, newObj interface{}) {
logger := s.logger.WithField("func", "onUpdate")
secret := newObj.(*corev1.Secret)
if secret.Name == s.secretName {
logger.Infof("TLS Secret %s updated", s.secretName)
logger.Infof("Password Secret %s updated", s.secretName)
err := s.savePasswordFromSecret(secret)
if err != nil {
logger.WithError(err).Errorf("Failed to extract password from secret %s", secret.Name)
Expand Down
15 changes: 8 additions & 7 deletions components/tls/pkg/password/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ const (
envNamespace = "POD_NAMESPACE"
)

type funcTLSServerOption struct {
type funcPasswordServerOption struct {
f func(options *PasswordStoreOptions)
}

func (fdo *funcTLSServerOption) apply(do *PasswordStoreOptions) {
func (fdo *funcPasswordServerOption) apply(do *PasswordStoreOptions) {
fdo.f(do)
}

func newFuncServerOption(f func(options *PasswordStoreOptions)) *funcTLSServerOption {
return &funcTLSServerOption{
func newFuncServerOption(f func(options *PasswordStoreOptions)) *funcPasswordServerOption {
return &funcPasswordServerOption{
f: f,
}
}
Expand All @@ -61,8 +61,8 @@ type PasswordStoreOptions struct {
}

func (c PasswordStoreOptions) String() string {
return fmt.Sprintf("prefix=%s clientset=%v",
c.prefix, c.clientset)
return fmt.Sprintf("prefix=%s locationSuffix=%s clientset=%v",
c.prefix, c.locationSuffix, c.clientset)
}

func getDefaultPasswordStoreOptions() PasswordStoreOptions {
Expand Down Expand Up @@ -96,8 +96,9 @@ func NewPasswordStore(opt ...PasswordStoreOption) (PasswordStore, error) {
if secretName, ok := util.GetEnv(opts.prefix, envSecretSuffix); ok {
logger.Infof("Starting new password k8s secret store for %s from secret %s", opts.prefix, secretName)
namespace, ok := os.LookupEnv(envNamespace)
logger.Infof("Namespace %s", namespace)
if !ok {
return nil, fmt.Errorf("Namespace env var %s not found and needed for secret TLS", envNamespace)
return nil, fmt.Errorf("Namespace env var %s not found and needed for password secret", envNamespace)
}
ps, err := NewPasswordSecretHandler(secretName, opts.clientset, namespace, opts.prefix, opts.locationSuffix, logger)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions components/tls/pkg/tls/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
EnvSASLMechanismSuffix = "_SASL_MECHANISM"
SASLMechanismSCRAMSHA512 = "SCRAM-SHA-512"
SASLMechanismSCRAMSHA256 = "SCRAM-SHA-256"
SASLMechanismOAUTHBEARER = "OAUTHBEARER"
SASLMechanismPlain = "PLAIN"

EnvEndpointIdentificationMechanismSuffix = "_TLS_ENDPOINT_IDENTIFICATION_ALGORITHM"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ kubectl create secret generic aws-msk-kafka-secret -n seldon-mesh --from-literal

## Configure Seldon Core v2

Configure Seldon Core v2 by setting following Helm values:

```{literalinclude} ../../../../../../k8s/samples/values-aws-msk-kafka-sasl-scram.yaml.tmpl
:language: yaml
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ kubectl create secret generic azure-kafka-secret -n seldon-mesh --from-literal p

## Configure Seldon Core v2

Configure Seldon Core v2 by setting following Helm values:

```{literalinclude} ../../../../../../k8s/samples/values-azure-event-hub-sasl.yaml.tmpl
:language: yaml
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Confluent Cloud Oauth 2.0 Example

> New in Seldon Core 2.7.0

Seldon Core v2 can integrate with Confluent Cloud managed Kafka.
In this example we use [Oauth 2.0 security mechanism](https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/overview.html).


## Configure Identity Provider in Confluent Cloud Console

In your Confluent Cloud Console go to [Account & Access / Identity providers](https://confluent.cloud/settings/org/identity_providers) and register your Identity Provider.


See Confluent Cloud [documentation](https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/identity-providers.html) for further details.


## Configure Identity Pool

In your Confluent Cloud Console go to [Account & Access / Identity providers](https://confluent.cloud/settings/org/identity_providers) and add new identity pool to your newly registered Identity Provider.

See Confluent Cloud [documentation](https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/identity-pools.html) for further details.


## Create Kubernetes Secret

Seldon Core v2 expects oauth credentials to be in form of K8s secret
```yaml
apiVersion: v1
kind: Secret
metadata:
name: confluent-kafka-oauth
namespace: seldon-mesh
type: Opaque
stringData:
method: OIDC
client_id: <client id>
client_secret: <client secret>
token_endpoint_url: <token endpoint url>
extensions: logicalCluster=<cluster id>,identityPoolId=<identity pool id>
scope: ""
```

You will need following information from Confluent Cloud:
- Cluster ID: `Cluster Overview` → `Cluster Settings` → `General` → `Identification`
- Identity Pool ID: `Accounts & access` → `Identity providers` → `<specific provider details>`

Client ID, client secret and token endpoint url should come from identity provider, e.g. Keycloak or Azure AD.


## Configure Seldon Core v2

Configure Seldon Core v2 by setting following Helm values:

```{literalinclude} ../../../../../../k8s/samples/values-confluent-kafka-oauth.yaml.tmpl
:language: yaml
```

Note you may need to tweak `replicationFactor` and `numPartitions` to your cluster configuration.


## Troubleshooting

- First check Confluent Cloud [documentation](https://docs.confluent.io/cloud/current/overview.html).

- Set the kafka config map debug setting to `all`. For Helm install you can set `kafka.debug=all`.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
> New in Seldon Core 2.5.0

Seldon Core v2 can integrate with Confluent Cloud managed Kafka.
In this example we use SASL security mechanism.


## Create API Keys
Expand All @@ -22,11 +23,13 @@ See Confluent Cloud [documentation](https://docs.confluent.io/cloud/current/clie

Seldon Core v2 expects password to be in form of K8s secret
```bash
kubectl create secret generic confluent-kafka-secret -n seldon-mesh --from-literal password="<Confluent Cloud API Secret>"
kubectl create secret generic confluent-kafka-sasl -n seldon-mesh --from-literal password="<Confluent Cloud API Secret>"
```

## Configure Seldon Core v2

Configure Seldon Core v2 by setting following Helm values:

```{literalinclude} ../../../../../../k8s/samples/values-confluent-kafka-sasl.yaml.tmpl
:language: yaml
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Examples are shown below:
* [SASL PLAIN with Azure Event Hub](azure-event-hub-sasl.md) example
* [SASL SCRAM with Strimzi](strimzi-sasl.md) example
* [SASL SCRAM with AWS MSK](aws-msk-sasl.md) example
* [SASL OAUTH with Confluent Cloud](confluent-oauth.md) example

## Data Plane

Expand Down Expand Up @@ -95,6 +96,7 @@ helm install seldon-v2-certs k8s/helm-charts/seldon-core-v2-certs/ -n seldon-mes
strimzi-mtls.md
strimzi-sasl.md
confluent-sasl.md
confluent-oauth.md
azure-event-hub-sasl.md
aws-msk-sasl.md
aws-msk-mtls.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ kubectl create -f k8s/samples/strimzi-example-tls-user.yaml -n seldon-mesh

Install seldon with the Strimzi certificate secrets using a custom values file. This sets the secret created by Strimzi for the user created above (`seldon`) and targets the server certificate authority secret from the name of the cluster created on install of the Kafka cluster (`seldon-cluster-ca-cert`).

Configure Seldon Core v2 by setting following Helm values:

```{literalinclude} ../../../../../../k8s/samples/values-strimzi-kafka-mtls.yaml
:language: yaml
```
Expand All @@ -45,4 +47,4 @@ You can now go ahead and install a SeldonRuntime in your desired install namespa

```
helm install seldon-v2-runtime ../k8s/helm-charts/seldon-core-v2-runtime -n seldon-mesh
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ This will call the Strimzi cluster Helm chart provided by the project with overr
Install Core v2 with SASL settings using a custom values file.
This sets the secret created by Strimzi for the user created above (`seldon`) and targets the server certificate authority secret from the name of the cluster created on install of the Kafka cluster (`seldon-cluster-ca-cert`).

Configure Seldon Core v2 by setting following Helm values:

```{literalinclude} ../../../../../../k8s/samples/values-strimzi-kafka-sasl-scram.yaml
:language: yaml
```
Expand Down
1 change: 1 addition & 0 deletions docs/source/contents/kubernetes/kafka/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This allow to take away all the complexity on running secure and scalable Kafka

We currently have tested and documented integration with following managed solutions:
- Confluent Cloud (security: SASL/PLAIN)
- Confluent Cloud (security: OAuth 2.0)
- Amazon MSK (security: mTLS)
- Amazon MSK (security: SASL/SCRAM)
- Azure Event Hub (security: SASL/PLAIN)
Expand Down
21 changes: 21 additions & 0 deletions k8s/samples/values-confluent-kafka-oauth.yaml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
kafka:
bootstrap: < Confluent Cloud Broker Endpoints >
topics:
replicationFactor: 3
numPartitions: 4
consumer:
messageMaxBytes: 8388608
producer:
messageMaxBytes: 8388608

security:
kafka:
protocol: SASL_SSL
sasl:
mechanism: OAUTHBEARER
client:
secret: confluent-kafka-oauth
ssl:
client:
secret:
brokerValidationSecret:
2 changes: 1 addition & 1 deletion k8s/samples/values-confluent-kafka-sasl.yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ security:
mechanism: "PLAIN"
client:
username: < username >
secret: confluent-kafka-secret
secret: confluent-kafka-sasl
ssl:
client:
secret:
Expand Down
2 changes: 2 additions & 0 deletions k8s/yaml/components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,8 @@ spec:
- containerPort: 9000
name: server-http
protocol: TCP
- containerPort: 8082
name: server-metrics
readinessProbe:
httpGet:
path: /v2/health/live
Expand Down
10 changes: 6 additions & 4 deletions operator/config/serverconfigs/mlserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ spec:
command:
- /bin/agent
args:
- --tracing-config-path=/mnt/tracing/tracing.json
- --tracing-config-path=/mnt/tracing/tracing.json
name: agent
env:
- name: SELDON_SERVER_CAPABILITIES
Expand All @@ -56,11 +56,11 @@ spec:
- name: SELDON_SERVER_HTTP_PORT
value: "9000"
- name: SELDON_SERVER_GRPC_PORT
value: "9500"
value: "9500"
- name: SELDON_REVERSE_PROXY_HTTP_PORT
value: "9001"
- name: SELDON_REVERSE_PROXY_GRPC_PORT
value: "9501"
value: "9501"
- name: SELDON_SCHEDULER_HOST
value: "seldon-scheduler"
- name: SELDON_SCHEDULER_PORT
Expand Down Expand Up @@ -153,7 +153,7 @@ spec:
path: /v2/health/live
port: server-http
initialDelaySeconds: 5
periodSeconds: 5
periodSeconds: 5
startupProbe:
httpGet:
path: /v2/health/live
Expand All @@ -168,6 +168,8 @@ spec:
- containerPort: 9000
name: server-http
protocol: TCP
- containerPort: 8082
name: server-metrics
volumeMounts:
- mountPath: /mnt/agent
name: mlserver-models
Expand Down
5 changes: 5 additions & 0 deletions scheduler/Dockerfile.modelgateway
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ FROM registry.access.redhat.com/ubi9/ubi-minimal as certs

# Kafka dependencies necessitate leaving CGo enabled and using a base image with C dependencies
FROM registry.access.redhat.com/ubi9/ubi-micro:9.2-9

# Kafka OIDC token retriva certs: https://github.com/confluentinc/librdkafka/issues/3751
COPY --from=certs /etc/ssl/certs/ca-bundle.crt /etc/ssl/certs/ca-certificates.crt

# Broker Certificates
COPY --from=certs /etc/ssl/certs/ca-bundle.crt /tmp/certs/kafka/broker/ca.crt
RUN chmod -R 777 /tmp/certs/
COPY --from=builder /build/scheduler/bin/modelgateway /bin/modelgateway
Expand Down
5 changes: 5 additions & 0 deletions scheduler/Dockerfile.pipelinegateway
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ FROM registry.access.redhat.com/ubi9/ubi-minimal as certs

# Kafka dependencies necessitate leaving CGo enabled and using a base image with C dependencies
FROM registry.access.redhat.com/ubi9/ubi-micro:9.2-9

# Kafka OIDC token retriva certs: https://github.com/confluentinc/librdkafka/issues/3751
COPY --from=certs /etc/ssl/certs/ca-bundle.crt /etc/ssl/certs/ca-certificates.crt

# Broker Certificates
COPY --from=certs /etc/ssl/certs/ca-bundle.crt /tmp/certs/kafka/broker/ca.crt
RUN chmod -R 777 /tmp/certs/
COPY --from=builder /build/scheduler/bin/pipelinegateway /bin/pipelinegateway
Expand Down
3 changes: 2 additions & 1 deletion scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ go 1.19
require (
github.com/OneOfOne/xxhash v1.2.8
github.com/cenkalti/backoff/v4 v4.1.2
github.com/confluentinc/confluent-kafka-go v1.9.1
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/envoyproxy/go-control-plane v0.11.1
github.com/fsnotify/fsnotify v1.5.1
github.com/ghodss/yaml v1.0.0
github.com/go-playground/validator/v10 v10.9.0
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.5.9
Expand Down
5 changes: 3 additions & 2 deletions scheduler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h
github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/confluentinc/confluent-kafka-go v1.9.1 h1:L3aW6KvTyrq/+BOMnDm9xJylhAEoAgqhoaJbMPe3GQI=
github.com/confluentinc/confluent-kafka-go v1.9.1/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo=
github.com/confluentinc/confluent-kafka-go v1.9.2 h1:gV/GxhMBUb03tFWkN+7kdhg+zf+QUM+wVkI9zwh770Q=
github.com/confluentinc/confluent-kafka-go v1.9.2/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down Expand Up @@ -226,6 +226,7 @@ github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWp
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down
Loading
Loading