diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go index a02816515f..420bd0a413 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go @@ -124,6 +124,10 @@ type ConsumerGroupStatus struct { // same Template, but individual replicas also have a consistent identity. // +optional Replicas *int32 `json:"replicas,omitempty"` + + // Selector is the string serialized label selector needed for the scale subresource. + // Defaults to "" + Selector string `json:"selector,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index a7b3859b3d..d59770a915 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -178,6 +178,8 @@ type Reconciler struct { func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { recordExpectedReplicasMetric(ctx, cg) + r.reconcileStatusSelector(cg) + if err := r.reconcileInitialOffset(ctx, cg); err != nil { return cg.MarkInitializeOffsetFailed("InitializeOffset", err) } @@ -259,6 +261,10 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consum return nil } +func (r *Reconciler) reconcileStatusSelector(cg *kafkainternals.ConsumerGroup) { + cg.Status.Selector = labels.SelectorFromValidatedSet(cg.Spec.Selector).String() +} + func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg) if err != nil { diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index 888ae8a56b..a4fd15e19f 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -148,6 +148,7 @@ func TestReconcileKind(t *testing.T) { ConsumerGroupReplicas(2), ConsumerGroupStatusReplicas(0), ConsumerForTrigger(), + ConsumerGroupStatusSelector(ConsumerLabels), ) cg.Status.Placements = []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, @@ -234,6 +235,7 @@ func TestReconcileKind(t *testing.T) { ), )), ConsumerGroupReplicas(2), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupStatusReplicas(0), ConsumerForTrigger(), ) @@ -340,6 +342,7 @@ func TestReconcileKind(t *testing.T) { }), )), ConsumerGroupReplicas(2), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerForTrigger(), ) cg.InitializeConditions() @@ -448,6 +451,7 @@ func TestReconcileKind(t *testing.T) { }), )), ConsumerGroupReplicas(2), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupStatusReplicas(0), ConsumerForTrigger(), ) @@ -572,6 +576,7 @@ func TestReconcileKind(t *testing.T) { }), )), ConsumerGroupReplicas(2), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupStatusReplicas(1), ConsumerForTrigger(), ) @@ -758,6 +763,7 @@ func TestReconcileKind(t *testing.T) { }), )), ConsumerGroupReplicas(2), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerForTrigger(), ) cg.InitializeConditions() @@ -974,6 +980,7 @@ func TestReconcileKind(t *testing.T) { )), ConsumerGroupReplicas(2), ConsumerGroupStatusReplicas(1), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerForTrigger(), ) cg.Status.Placements = []eventingduckv1alpha1.Placement{ @@ -1063,6 +1070,7 @@ func TestReconcileKind(t *testing.T) { ConsumerGroupReplicas(1), ConsumerGroupStatusReplicas(0), ConsumerForTrigger(), + ConsumerGroupStatusSelector(ConsumerLabels), ) cg.Status.Placements = []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, @@ -1144,6 +1152,7 @@ func TestReconcileKind(t *testing.T) { ConsumerGroupIdConfig("my.group.id"), ), )), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupReplicas(2), ConsumerGroupStatusReplicas(0), ConsumerForTrigger(), @@ -1230,6 +1239,7 @@ func TestReconcileKind(t *testing.T) { ConsumerGroupIdConfig("my.group.id"), ), )), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupReplicas(2), ConsumerGroupStatusReplicas(1), ConsumerForTrigger(), @@ -1358,6 +1368,7 @@ func TestReconcileKind(t *testing.T) { ConsumerInitialOffset(sources.OffsetLatest), )), )), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupReplicas(3), ConsumerForTrigger(), ) @@ -1468,6 +1479,7 @@ func TestReconcileKind(t *testing.T) { ConsumerGroupIdConfig("my.group.id"), ), )), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupReplicas(3), ConsumerForTrigger(), ) @@ -1577,6 +1589,7 @@ func TestReconcileKind(t *testing.T) { ConsumerGroupIdConfig("my.group.id"), ), )), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupReplicas(2), ConsumerForTrigger(), ) @@ -1640,6 +1653,7 @@ func TestReconcileKind(t *testing.T) { ), )), ConsumerGroupReplicas(2), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerForTrigger(), ) cg.GetConditionSet().Manage(cg.GetStatus()).InitializeConditions() @@ -1776,6 +1790,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { ), )), ConsumerGroupReplicas(2), + ConsumerGroupStatusSelector(ConsumerLabels), ConsumerGroupStatusReplicas(1), ConsumerForTrigger(), ) diff --git a/control-plane/pkg/reconciler/testing/objects_consumergroup.go b/control-plane/pkg/reconciler/testing/objects_consumergroup.go index e762c1365f..a8a3a6171d 100644 --- a/control-plane/pkg/reconciler/testing/objects_consumergroup.go +++ b/control-plane/pkg/reconciler/testing/objects_consumergroup.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/utils/pointer" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -189,6 +190,12 @@ func ConsumerGroupStatusReplicas(replicas int32) ConsumerGroupOption { } } +func ConsumerGroupStatusSelector(label map[string]string) ConsumerGroupOption { + return func(cg *kafkainternals.ConsumerGroup) { + cg.Status.Selector = labels.SelectorFromSet(label).String() + } +} + func ConsumerGroupReplicasStatus(replicas int32) ConsumerGroupOption { return func(cg *kafkainternals.ConsumerGroup) { cg.Status.Replicas = pointer.Int32(replicas)