Skip to content

Commit

Permalink
Create configmap for starting dispatcher pods (#4027) (#4101)
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Sep 17, 2024
1 parent ff0b991 commit a488e34
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 13 deletions.
2 changes: 2 additions & 0 deletions control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
Expand Down Expand Up @@ -67,6 +68,7 @@ func main() {
ctx = filteredFactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
auth.OIDCLabelSelector,
eventing.DispatcherLabelSelectorStr,
)
ctx = clientpool.WithKafkaClientPool(ctx)

Expand Down
5 changes: 5 additions & 0 deletions control-plane/pkg/apis/internals/kafka/eventing/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
ConfigMapVolumeName = "kafka-resources"

DispatcherVolumeName = "contract-resources"

DataPlanePodKindLabelKey = "app.kubernetes.io/kind"
DispatcherPodKindLabelValue = "kafka-dispatcher"

DispatcherLabelSelectorStr = DataPlanePodKindLabelKey + "=" + DispatcherPodKindLabelValue
)

func ConfigMapNameFromPod(p *corev1.Pod) (string, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package v1alpha1

import (
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -75,3 +77,16 @@ func IsKnownStatefulSet(name string) bool {
name == ChannelStatefulSetName ||
name == BrokerStatefulSetName
}

func GetOwnerKindFromStatefulSetPrefix(name string) (string, bool) {
if strings.HasPrefix(name, SourceStatefulSetName) {
return "KafkaSource", true
}
if strings.HasPrefix(name, ChannelStatefulSetName) {
return "KafkaChannel", true
}
if strings.HasPrefix(name, BrokerStatefulSetName) {
return "Trigger", true
}
return "", false
}
49 changes: 47 additions & 2 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
Expand All @@ -42,7 +44,7 @@ import (
"knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -119,13 +121,15 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I

clientPool := clientpool.Get(ctx)

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)

r := &Reconciler{
SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok },
ConsumerLister: consumer.Get(ctx).Lister(),
InternalsClient: internalsclient.Get(ctx).InternalV1alpha1(),
SecretLister: secretinformer.Get(ctx).Lister(),
ConfigMapLister: configmapinformer.Get(ctx).Lister(),
PodLister: podinformer.Get(ctx).Lister(),
PodLister: dispatcherPodInformer.Lister(),
KubeClient: kubeclient.Get(ctx),
NameGenerator: names.SimpleNameGenerator,
GetKafkaClient: clientPool.GetClient,
Expand Down Expand Up @@ -195,6 +199,47 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I

ResyncOnStatefulSetChange(ctx, globalResync)

dispatcherPodInformer.Informer().AddEventHandler(controller.HandleAll(func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}

kind, ok := kafkainternals.GetOwnerKindFromStatefulSetPrefix(pod.Name)
if !ok {
return
}

cmName, err := eventing.ConfigMapNameFromPod(pod)
if err != nil {
logger.Warnw("Failed to get ConfigMap name from pod", zap.String("pod", pod.Name), zap.Error(err))
return
}
if err := r.ensureContractConfigMapExists(ctx, pod, cmName); err != nil {
logger.Warnw("Failed to ensure ConfigMap for pod exists", zap.String("pod", pod.Name), zap.String("configmap", cmName), zap.Error(err))
return
}

impl.FilteredGlobalResync(
func(obj interface{}) bool {
cg, ok := obj.(*kafkainternals.ConsumerGroup)
if !ok {
return false
}

uf := cg.GetUserFacingResourceRef()
if uf == nil {
return false
}
if strings.EqualFold(kind, uf.Kind) {
return true
}
return false
},
consumerGroupInformer.Informer(),
)
}))

//Todo: ScaledObject informer when KEDA is installed

return impl
Expand Down
16 changes: 13 additions & 3 deletions control-plane/pkg/reconciler/consumergroup/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,30 @@
package consumergroup

import (
"context"
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/sources/v1beta1/kafkasource/fake"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/node/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
"knative.dev/pkg/configmap"
reconcilertesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/sources/v1beta1/kafkasource/fake"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"

kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumer/fake"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup/fake"
Expand All @@ -55,7 +61,11 @@ const (
)

func TestNewController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, func(ctx context.Context) context.Context {
return filteredFactory.WithSelectors(ctx,
eventing.DispatcherLabelSelectorStr,
)
})
ctx, _ = kedaclient.With(ctx)

t.Setenv("SYSTEM_NAMESPACE", systemNamespace)
Expand Down
25 changes: 17 additions & 8 deletions control-plane/pkg/reconciler/consumergroup/evictor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package consumergroup

import (
"context"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -26,21 +27,23 @@ import (
"k8s.io/utils/pointer"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
reconcilertesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
kafkainternalsclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
)

func TestNewEvictor(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

require.NotPanics(t, func() { newEvictor(ctx, zap.String("k", "n")) })
}

func TestEvictorNilPodNoPanic(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

var pod *corev1.Pod

Expand Down Expand Up @@ -74,7 +77,7 @@ func TestEvictorNilPodNoPanic(t *testing.T) {
}

func TestEvictorEvictSuccess(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand Down Expand Up @@ -111,7 +114,7 @@ func TestEvictorEvictSuccess(t *testing.T) {
}

func TestEvictorNoEvictionEmptyPlacement(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand Down Expand Up @@ -145,7 +148,7 @@ func TestEvictorNoEvictionEmptyPlacement(t *testing.T) {
}

func TestEvictorNoEviction(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand Down Expand Up @@ -181,7 +184,7 @@ func TestEvictorNoEviction(t *testing.T) {
}

func TestEvictorEvictSuccessConsumerGroupSchedulingInProgress(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand Down Expand Up @@ -212,7 +215,7 @@ func TestEvictorEvictSuccessConsumerGroupSchedulingInProgress(t *testing.T) {
}

func TestEvictorEvictPodNotFound(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand All @@ -238,7 +241,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) {
require.Nil(t, err)
}
func TestEvictorEvictConsumerGroupNotFound(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"},
Expand All @@ -263,3 +266,9 @@ func TestEvictorEvictConsumerGroupNotFound(t *testing.T) {

require.Nil(t, err)
}

func withFilteredSelectors(ctx context.Context) context.Context {
return filteredFactory.WithSelectors(ctx,
eventing.DispatcherLabelSelectorStr,
)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a488e34

Please sign in to comment.