Skip to content

Commit

Permalink
Merge branch 'main' into migrate-to-kubecodegen
Browse files Browse the repository at this point in the history
  • Loading branch information
pierDipi committed Oct 9, 2024
2 parents 2a72a75 + b9233fe commit 825c433
Show file tree
Hide file tree
Showing 228 changed files with 20,472 additions and 14,669 deletions.
10 changes: 2 additions & 8 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ aliases:
eventing-natss-approvers:
- astelmashenko
- dan-j
- zhaojizhuang
eventing-prometheus-approvers:
- lberk
- matzew
Expand Down Expand Up @@ -118,33 +117,29 @@ aliases:
- salaboy
homebrew-kn-plugins-approvers:
- dsimansk
- maximilien
- rhuss
kn-plugin-admin-approvers:
- maximilien
- dsimansk
- rhuss
- zhanggbj
kn-plugin-event-approvers:
- cardil
- rhuss
kn-plugin-operator-approvers:
- dsimansk
- houshengbo
- maximilien
- rhuss
kn-plugin-quickstart-approvers:
- dsimansk
- psschwei
- rhuss
kn-plugin-sample-approvers:
- maximilien
- dsimansk
- rhuss
kn-plugin-service-log-approvers:
- rhuss
kn-plugin-source-kafka-approvers:
- daisy-ycguo
- dsimansk
- maximilien
- rhuss
kn-plugin-source-kamelet-approvers:
- christophd
Expand Down Expand Up @@ -199,7 +194,6 @@ aliases:
- upodroid
security-guard-approvers:
- davidhadas
- maximilien
- psschwei
- rhuss
security-wg-leads:
Expand Down
14 changes: 13 additions & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package main
import (
"context"
"log"
"os"
"strings"

"github.com/IBM/sarama"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -70,7 +73,16 @@ func main() {
auth.OIDCLabelSelector,
kafkainternals.DispatcherLabelSelectorStr,

Check warning on line 74 in control-plane/cmd/kafka-controller/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/kafka-controller/main.go#L74

Added line #L74 was not covered by tests
)
ctx = clientpool.WithKafkaClientPool(ctx)

if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") {
sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") {
ctx = clientpool.WithKafkaClientPool(ctx)
}

sharedmain.MainNamed(ctx, component,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ENABLE_SARAMA_LOGGER
value: "false"
- name: ENABLE_SARAMA_DEBUG_LOGGER
value: "false"
- name: ENABLE_SARAMA_CLIENT_POOL
value: "true"

ports:
- containerPort: 9090
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spec:
labels:
app: kafka-controller-post-install
app.kubernetes.io/version: devel
sidecar.istio.io/inject: "false"
annotations:
sidecar.istio.io/inject: "false"
spec:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spec:
labels:
app: "knative-kafka-storage-version-migrator"
app.kubernetes.io/version: devel
sidecar.istio.io/inject: "false"
annotations:
sidecar.istio.io/inject: "false"
spec:
Expand Down
39 changes: 39 additions & 0 deletions control-plane/pkg/apis/eventing/v1alpha1/implements_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1alpha1

import (
"testing"

"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

func TestTypesImplements(t *testing.T) {
testCases := []struct {
instance interface{}
iface duck.Implementable
}{
{instance: &KafkaSink{}, iface: &duckv1.Conditions{}},
{instance: &KafkaSink{}, iface: &duckv1.Addressable{}},
}
for _, tc := range testCases {
if err := duck.VerifyType(tc.instance, tc.iface); err != nil {
t.Error(err)
}
}
}
19 changes: 18 additions & 1 deletion control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

const (
ConditionAddressable apis.ConditionType = "Addressable"
ConditionAddressable apis.ConditionType = "Addressable"
ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

var conditionSet apis.ConditionSet
Expand Down Expand Up @@ -54,3 +55,19 @@ func (ks *KafkaSinkStatus) SetAddress(addr *duckv1.Addressable) {
func (kss *KafkaSinkStatus) InitializeConditions() {
kss.GetConditionSet().Manage(kss).InitializeConditions()
}

func (kss *KafkaSinkStatus) MarkEventPoliciesTrue() {
kss.GetConditionSet().Manage(kss).MarkTrue(ConditionEventPoliciesReady)
}

func (kss *KafkaSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kss *KafkaSinkStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kss *KafkaSinkStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -108,6 +109,10 @@ type ConsumerGroupSpec struct {
// OIDCServiceAccountName is the name of service account used for this components
// OIDC authentication.
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`

// TopLevelResourceRef is a reference to a top level resource.
// For a ConsumerGroup associated with a Trigger, a Broker reference will be set.
TopLevelResourceRef *corev1.ObjectReference `json:"topLevelResourceRef,omitempty"`
}

type ConsumerGroupStatus struct {
Expand Down Expand Up @@ -210,6 +215,13 @@ func (cg *ConsumerGroup) GetUserFacingResourceRef() *metav1.OwnerReference {
return nil
}

// GetTopLevelUserFacingResourceRef gets the top level resource reference to the user-facing resources
// that are backed by this ConsumerGroup using the OwnerReference list.
// For example, for a Trigger, it will return a Broker reference.
func (cg *ConsumerGroup) GetTopLevelUserFacingResourceRef() *corev1.ObjectReference {
return cg.Spec.TopLevelResourceRef
}

func (cg *ConsumerGroup) IsNotScheduled() bool {
// We want to return true when:
// - the condition isn't present, or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
// KafkaChannelConditionChannelServiceReady has status True when the K8S Service representing the channel
// is ready. Because this uses ExternalName, there are no endpoints to check.
KafkaChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady"

ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

// RegisterAlternateKafkaChannelConditionSet register a different apis.ConditionSet.
Expand Down Expand Up @@ -129,3 +131,19 @@ func (kcs *KafkaChannelStatus) MarkChannelServiceFailed(reason, messageFormat st
func (kcs *KafkaChannelStatus) MarkChannelServiceTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionChannelServiceReady)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(ConditionEventPoliciesReady)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
39 changes: 39 additions & 0 deletions control-plane/pkg/apis/sources/v1beta1/implements_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1beta1

import (
"testing"

"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

func TestTypesImplements(t *testing.T) {
testCases := []struct {
instance interface{}
iface duck.Implementable
}{
{instance: &KafkaSource{}, iface: &duckv1.Conditions{}},
{instance: &KafkaSource{}, iface: &duckv1.Source{}},
}
for _, tc := range testCases {
if err := duck.VerifyType(tc.instance, tc.iface); err != nil {
t.Error(err)
}
}
}
24 changes: 14 additions & 10 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"sort"
"strings"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"

"github.com/rickb777/date/period"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
duck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/pkg/resolver"

Expand All @@ -55,14 +55,18 @@ func ContentModeFromString(mode string) contract.ContentMode {
}
}

// EventPoliciesFromAppliedEventPoliciesStatus resolves a AppliedEventPoliciesStatus into a list of contract.EventPolicy
func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPoliciesStatus, lister v1alpha1.EventPolicyLister, namespace string, features feature.Flags) ([]*contract.EventPolicy, error) {
eventPolicies := make([]*contract.EventPolicy, 0, len(status.Policies))
// ContractEventPoliciesFromEventPolicies resolves a list of v1alpha1.EventPolicy into a list of contract.EventPolicy
func ContractEventPoliciesFromEventPolicies(applyingEventPolicies []*eventingv1alpha1.EventPolicy, namespace string, features feature.Flags) []*contract.EventPolicy {
if !features.IsOIDCAuthentication() {
return nil
}

for _, appliedPolicy := range status.Policies {
policy, err := lister.EventPolicies(namespace).Get(appliedPolicy.Name)
if err != nil {
return nil, fmt.Errorf("failed to get eventPolicy %s: %w", appliedPolicy.Name, err)
eventPolicies := make([]*contract.EventPolicy, 0, len(applyingEventPolicies))

for _, policy := range applyingEventPolicies {
if !policy.Status.IsReady() {
// only add ready eventpolicies to the contract
continue
}

contractPolicy := &contract.EventPolicy{}
Expand Down Expand Up @@ -132,7 +136,7 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie
// else: deny all -> add no additional policy
}

return eventPolicies, nil
return eventPolicies
}

func EgressConfigFromDelivery(
Expand Down
Loading

0 comments on commit 825c433

Please sign in to comment.