Skip to content

Commit

Permalink
initial OAUTH configuration code for Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalSkolasinski committed Aug 21, 2023
1 parent 09270ef commit efea001
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ scheduler/mlrepo
scheduler/mnt
scheduler/notebooks
scheduler/venv

# General
go.work
go.work.sum

171 changes: 171 additions & 0 deletions components/tls/pkg/oauth/k8s_secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
Copyright 2023 Seldon Technologies Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package oauth

import (
"context"
"fmt"
"sync"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/seldonio/seldon-core/components/tls/v2/pkg/k8s"
)

type OAUTHSecretHandler struct {
clientset kubernetes.Interface
secretName string
namespace string
stopper chan struct{}
logger log.FieldLogger
mu sync.RWMutex
oauthConfig OAUTHConfig
}

func NewOAUTHSecretHandler(secretName string, clientset kubernetes.Interface, namespace string, prefix string, locationSuffix string, logger log.FieldLogger) (*OAUTHSecretHandler, error) {
if clientset == nil {
var err error
clientset, err = k8s.CreateClientset()
if err != nil {
logger.WithError(err).Error("Failed to create clientset for OAUTH secret handler")
return nil, err
}
}
return &OAUTHSecretHandler{
clientset: clientset,
secretName: secretName,
namespace: namespace,
stopper: make(chan struct{}),
logger: logger,
}, nil
}

func (s *OAUTHSecretHandler) GetOAUTHConfig() OAUTHConfig {
s.mu.RLock()
defer s.mu.RUnlock()
return s.oauthConfig
}

func (s *OAUTHSecretHandler) Stop() {
close(s.stopper)
}

func (s *OAUTHSecretHandler) saveOAUTHFromSecret(secret *corev1.Secret) error {
// Read and Save oauthbearer method
method, ok := secret.Data[methodKey]
if !ok {
return fmt.Errorf("Failed to find %s in secret %s", methodKey, secret.Name)
}
s.oauthConfig.Method = string(method)

// Read and Save oauthbearer client id
clientID, ok := secret.Data[clientIDKey]
if !ok {
return fmt.Errorf("Failed to find %s in secret %s", clientIDKey, secret.Name)
}
s.oauthConfig.ClientID = string(clientID)

// Read and Save oauthbearer client secret
clientSecret, ok := secret.Data[clientSecretKey]
if !ok {
return fmt.Errorf("Failed to find %s in secret %s", clientSecretKey, secret.Name)
}
s.oauthConfig.ClientSecret = string(clientSecret)

// Read and Save oauthbearer token endpoint url
tokenEndpointURL, ok := secret.Data[tokenEndpointURLKey]
if !ok {
return fmt.Errorf("Failed to find %s in secret %s", tokenEndpointURLKey, secret.Name)
}
s.oauthConfig.TokenEndpointURL = string(tokenEndpointURL)

// Read and Save oauthbearer extensions
extensions, ok := secret.Data[extensionsKey]
if !ok {
return fmt.Errorf("Failed to find %s in secret %s", extensionsKey, secret.Name)
}
s.oauthConfig.Extensions = string(extensions)

return nil
}

func (s *OAUTHSecretHandler) onAdd(obj interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
logger := s.logger.WithField("func", "onAdd")
secret := obj.(*corev1.Secret)
if secret.Name == s.secretName {
logger.Infof("OAUTH Secret %s added", s.secretName)
err := s.saveOAUTHFromSecret(secret)
if err != nil {
logger.WithError(err).Errorf("Failed to extract OAUTH from secret %s", secret.Name)
}
}
}

func (s *OAUTHSecretHandler) onUpdate(oldObj, newObj interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
logger := s.logger.WithField("func", "onUpdate")
secret := newObj.(*corev1.Secret)
if secret.Name == s.secretName {
logger.Infof("OAUTH Secret %s updated", s.secretName)
err := s.saveOAUTHFromSecret(secret)
if err != nil {
logger.WithError(err).Errorf("Failed to extract OAUTH from secret %s", secret.Name)
}
}
}

func (s *OAUTHSecretHandler) onDelete(obj interface{}) {
logger := s.logger.WithField("func", "onDelete")
secret := obj.(*corev1.Secret)
if secret.Name == s.secretName {
logger.Warnf("Secret %s deleted", secret.Name)
}
}

func (s *OAUTHSecretHandler) loadOAUTH(secretName string) error {
secret, err := s.clientset.CoreV1().Secrets(s.namespace).Get(context.Background(), secretName, metav1.GetOptions{})
if err != nil {
return err
}
return s.saveOAUTHFromSecret(secret)
}

func (s *OAUTHSecretHandler) GetOAUTHAndWatch() error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.loadOAUTH(s.secretName)
if err != nil {
return err
}
coreInformers := informers.NewSharedInformerFactoryWithOptions(s.clientset, 0, informers.WithNamespace(s.namespace))
coreInformers.Core().V1().Secrets().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onAdd,
UpdateFunc: s.onUpdate,
DeleteFunc: s.onDelete,
})
coreInformers.WaitForCacheSync(s.stopper)
coreInformers.Start(s.stopper)
return nil
}
33 changes: 33 additions & 0 deletions components/tls/pkg/oauth/oauth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2022 Seldon Technologies Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package oauth

const (
methodKey = "method"
clientIDKey = "client_id"
clientSecretKey = "client_secret"
tokenEndpointURLKey = "token_endpoint_url"
extensionsKey = "extensions"
)

type OAUTHConfig struct {
Method string
ClientID string
ClientSecret string
TokenEndpointURL string
Extensions string
}
116 changes: 116 additions & 0 deletions components/tls/pkg/oauth/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
Copyright 2023 Seldon Technologies Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package oauth

import (
"fmt"
"os"

"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"

"github.com/seldonio/seldon-core/components/tls/v2/pkg/util"
)

const (
envSecretSuffix = "_OAUTH_SECRET_NAME"
envNamespace = "POD_NAMESPACE"
)

type funcOAUTHServerOption struct {
f func(options *OAUTHStoreOptions)
}

func (fdo *funcOAUTHServerOption) apply(do *OAUTHStoreOptions) {
fdo.f(do)
}

func newFuncServerOption(f func(options *OAUTHStoreOptions)) *funcOAUTHServerOption {
return &funcOAUTHServerOption{
f: f,
}
}

type OAUTHStore interface {
GetOAUTHConfig() OAUTHConfig
Stop()
}

type OAUTHStoreOption interface {
apply(options *OAUTHStoreOptions)
}

type OAUTHStoreOptions struct {
prefix string
locationSuffix string
clientset kubernetes.Interface
}

func (c OAUTHStoreOptions) String() string {
return fmt.Sprintf("prefix=%s locationSuffix=%s clientset=%v",
c.prefix, c.locationSuffix, c.clientset)
}

func getDefaultOAUTHStoreOptions() OAUTHStoreOptions {
return OAUTHStoreOptions{}
}

func Prefix(prefix string) OAUTHStoreOption {
return newFuncServerOption(func(o *OAUTHStoreOptions) {
o.prefix = prefix
})
}

func LocationSuffix(suffix string) OAUTHStoreOption {
return newFuncServerOption(func(o *OAUTHStoreOptions) {
o.locationSuffix = suffix
})
}
func ClientSet(clientSet kubernetes.Interface) OAUTHStoreOption {
return newFuncServerOption(func(o *OAUTHStoreOptions) {
o.clientset = clientSet
})
}

func NewOAUTHStore(opt ...OAUTHStoreOption) (OAUTHStore, error) {
opts := getDefaultOAUTHStoreOptions()
for _, o := range opt {
o.apply(&opts)
}
logger := logrus.New().WithField("source", "OAUTHStore")
logger.Infof("Options:%s", opts.String())
if secretName, ok := util.GetEnv(opts.prefix, envSecretSuffix); ok {
logger.Infof("Starting new OAUTH k8s secret store for %s from secret %s", opts.prefix, secretName)
namespace, ok := os.LookupEnv(envNamespace)
logger.Infof("Namespace %s", namespace)
if !ok {
return nil, fmt.Errorf("Namespace env var %s not found and needed for OAUTH secret", envNamespace)
}
ps, err := NewOAUTHSecretHandler(secretName, opts.clientset, namespace, opts.prefix, opts.locationSuffix, logger)
if err != nil {
return nil, err
}
err = ps.GetOAUTHAndWatch()
if err != nil {
return nil, err
}
return ps, nil
} else {
// NOT IMPLEMENTED ERROR
return nil, fmt.Errorf("OAUTH mechanism is currently only supported on K8s")
}
}
2 changes: 1 addition & 1 deletion components/tls/pkg/password/k8s_secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *PasswordSecretHandler) onUpdate(oldObj, newObj interface{}) {
logger := s.logger.WithField("func", "onUpdate")
secret := newObj.(*corev1.Secret)
if secret.Name == s.secretName {
logger.Infof("TLS Secret %s updated", s.secretName)
logger.Infof("Password Secret %s updated", s.secretName)
err := s.savePasswordFromSecret(secret)
if err != nil {
logger.WithError(err).Errorf("Failed to extract password from secret %s", secret.Name)
Expand Down
15 changes: 8 additions & 7 deletions components/tls/pkg/password/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ const (
envNamespace = "POD_NAMESPACE"
)

type funcTLSServerOption struct {
type funcPasswordServerOption struct {
f func(options *PasswordStoreOptions)
}

func (fdo *funcTLSServerOption) apply(do *PasswordStoreOptions) {
func (fdo *funcPasswordServerOption) apply(do *PasswordStoreOptions) {
fdo.f(do)
}

func newFuncServerOption(f func(options *PasswordStoreOptions)) *funcTLSServerOption {
return &funcTLSServerOption{
func newFuncServerOption(f func(options *PasswordStoreOptions)) *funcPasswordServerOption {
return &funcPasswordServerOption{
f: f,
}
}
Expand All @@ -61,8 +61,8 @@ type PasswordStoreOptions struct {
}

func (c PasswordStoreOptions) String() string {
return fmt.Sprintf("prefix=%s clientset=%v",
c.prefix, c.clientset)
return fmt.Sprintf("prefix=%s locationSuffix=%s clientset=%v",
c.prefix, c.locationSuffix, c.clientset)
}

func getDefaultPasswordStoreOptions() PasswordStoreOptions {
Expand Down Expand Up @@ -96,8 +96,9 @@ func NewPasswordStore(opt ...PasswordStoreOption) (PasswordStore, error) {
if secretName, ok := util.GetEnv(opts.prefix, envSecretSuffix); ok {
logger.Infof("Starting new password k8s secret store for %s from secret %s", opts.prefix, secretName)
namespace, ok := os.LookupEnv(envNamespace)
logger.Infof("Namespace %s", namespace)
if !ok {
return nil, fmt.Errorf("Namespace env var %s not found and needed for secret TLS", envNamespace)
return nil, fmt.Errorf("Namespace env var %s not found and needed for password secret", envNamespace)
}
ps, err := NewPasswordSecretHandler(secretName, opts.clientset, namespace, opts.prefix, opts.locationSuffix, logger)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions components/tls/pkg/tls/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
EnvSASLMechanismSuffix = "_SASL_MECHANISM"
SASLMechanismSCRAMSHA512 = "SCRAM-SHA-512"
SASLMechanismSCRAMSHA256 = "SCRAM-SHA-256"
SASLMechanismOAUTHBEARER = "OAUTHBEARER"
SASLMechanismPlain = "PLAIN"

EnvEndpointIdentificationMechanismSuffix = "_TLS_ENDPOINT_IDENTIFICATION_ALGORITHM"
Expand Down
Loading

0 comments on commit efea001

Please sign in to comment.