Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v1.15] Set the name value for metrics tags to the correct top-level resource #1284

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -108,6 +109,10 @@ type ConsumerGroupSpec struct {
// OIDCServiceAccountName is the name of service account used for this components
// OIDC authentication.
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`

// TopLevelResourceRef is a reference to a top level resource.
// For a ConsumerGroup associated with a Trigger, a Broker reference will be set.
TopLevelResourceRef *corev1.ObjectReference `json:"topLevelResourceRef,omitempty"`
}

type ConsumerGroupStatus struct {
Expand Down Expand Up @@ -210,6 +215,13 @@ func (cg *ConsumerGroup) GetUserFacingResourceRef() *metav1.OwnerReference {
return nil
}

// GetTopLevelUserFacingResourceRef gets the top level resource reference to the user-facing resources
// that are backed by this ConsumerGroup using the OwnerReference list.
// For example, for a Trigger, it will return a Broker reference.
func (cg *ConsumerGroup) GetTopLevelUserFacingResourceRef() *corev1.ObjectReference {
return cg.Spec.TopLevelResourceRef
}

func (cg *ConsumerGroup) IsNotScheduled() bool {
// We want to return true when:
// - the condition isn't present, or
Expand Down
12 changes: 10 additions & 2 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
corelisters "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/network"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources"

v1 "knative.dev/eventing/pkg/apis/duck/v1"
messaging "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -587,6 +588,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag
},
},
Spec: internalscg.ConsumerGroupSpec{
TopLevelResourceRef: &corev1.ObjectReference{
APIVersion: messagingv1beta1.SchemeGroupVersion.String(),
Kind: "KafkaChannel",
Name: channel.Name,
Namespace: channel.Namespace,
UID: channel.UID,
},
Template: internalscg.ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down
27 changes: 26 additions & 1 deletion control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ import (
messagingv1beta1kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"

"github.com/rickb777/date/period"
eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1"

internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
fakeconsumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1"
)

const (
Expand Down Expand Up @@ -460,6 +461,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -531,6 +533,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -603,6 +606,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -660,6 +664,7 @@ func TestReconcileKind(t *testing.T) {
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewChannel())),
WithConsumerGroupMetaLabels(OwnerAsChannelLabel),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -700,6 +705,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
},
Expand Down Expand Up @@ -753,6 +759,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
WithConsumerGroupFailed("failed to reconcile consumer group,", "internal error"),
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -830,6 +837,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
NewConsumerGroup(
WithConsumerGroupName(Subscription2UUID),
Expand All @@ -847,6 +855,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)),
ConsumerReply(ConsumerNoReply()),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -916,6 +925,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerDelivery(NewConsumerSpecDelivery(kafkasource.Ordered)),
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)),
)),
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand All @@ -937,6 +947,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -1184,6 +1195,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1288,6 +1300,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1391,6 +1404,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1488,6 +1502,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -2168,3 +2183,13 @@ func httpsURL(name string, namespace string) *apis.URL {
Path: fmt.Sprintf("/%s/%s", namespace, name),
}
}

func withChannelTopLevelResourceRef() ConsumerGroupOption {
return WithTopLevelResourceRef(&corev1.ObjectReference{
APIVersion: messagingv1beta.SchemeGroupVersion.String(),
Kind: "KafkaChannel",
Namespace: ChannelNamespace,
Name: ChannelName,
UID: ChannelUUID,
})
}
35 changes: 34 additions & 1 deletion control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,22 @@ func (r *Reconciler) reconcileContractResource(ctx context.Context, c *kafkainte
egress.VReplicas = 1
}

topLevelUserFacingResourceRef, err := r.reconcileTopLevelUserFacingResourceRef(c)
if err != nil {
return nil, fmt.Errorf("failed to reconcile top-level user facing resource reference: %w", err)
}
if topLevelUserFacingResourceRef == nil {
topLevelUserFacingResourceRef = userFacingResourceRef
}

resource := &contract.Resource{
Uid: string(c.UID),
Topics: c.Spec.Topics,
BootstrapServers: c.Spec.Configs.Configs["bootstrap.servers"],
Egresses: []*contract.Egress{egress},
Auth: nil, // Auth will be added by reconcileAuth
CloudEventOverrides: reconcileCEOverrides(c),
Reference: userFacingResourceRef,
Reference: topLevelUserFacingResourceRef,
}

if err := r.reconcileAuth(ctx, c, resource); err != nil {
Expand Down Expand Up @@ -296,6 +304,31 @@ func (r *Reconciler) reconcileUserFacingResourceRef(c *kafkainternals.Consumer)
return ref, nil
}

func (r *Reconciler) reconcileTopLevelUserFacingResourceRef(c *kafkainternals.Consumer) (*contract.Reference, error) {

cg, err := r.ConsumerGroupLister.ConsumerGroups(c.GetNamespace()).Get(c.GetConsumerGroup().Name)
if apierrors.IsNotFound(err) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get %s: %w", kafkainternals.ConsumerGroupGroupVersionKind.Kind, err)
}

userFacingResource := cg.GetTopLevelUserFacingResourceRef()
if userFacingResource == nil {
return nil, nil
}

ref := &contract.Reference{
Uuid: string(userFacingResource.UID),
Namespace: c.GetNamespace(),
Name: userFacingResource.Name,
Kind: userFacingResource.Kind,
GroupVersion: userFacingResource.APIVersion,
}
return ref, nil
}

func reconcileDeliveryOrder(c *kafkainternals.Consumer) contract.DeliveryOrder {
if c.Spec.Delivery == nil {
return contract.DeliveryOrder_UNORDERED
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,9 @@ func WithConfigmapOwnerRef(ownerref *metav1.OwnerReference) reconcilertesting.Co
cg.ObjectMeta.OwnerReferences = []metav1.OwnerReference{*ownerref}
}
}

func WithTopLevelResourceRef(ref *corev1.ObjectReference) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Spec.TopLevelResourceRef = ref
}
}
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/trigger/v2/triggerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, broker *eventin
},
},
Spec: internalscg.ConsumerGroupSpec{
TopLevelResourceRef: &corev1.ObjectReference{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Broker",
Name: broker.Name,
Namespace: broker.Namespace,
UID: broker.UID,
},
Template: internalscg.ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down
Loading