diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index 1b7a99902b..64cc211085 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -449,7 +449,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr return cg.MarkScheduleConsumerFailed("Schedule", err) } - placements, err := statefulSetScheduler.Schedule(cg) + placements, err := statefulSetScheduler.Schedule(ctx, cg) if err != nil { return cg.MarkScheduleConsumerFailed("Schedule", err) } diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index 536d0574aa..f1c37fd574 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -61,10 +61,10 @@ import ( kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake" ) -type SchedulerFunc func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) +type SchedulerFunc func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) -func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { - return f(vpod) +func (f SchedulerFunc) Schedule(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + return f(ctx, vpod) } const ( @@ -102,7 +102,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -189,7 +189,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -307,7 +307,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -402,7 +402,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -528,7 +528,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -702,7 +702,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -877,7 +877,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1034,7 +1034,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, }, nil @@ -1121,7 +1121,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1208,7 +1208,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1303,7 +1303,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 2}, @@ -1426,7 +1426,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 2}, @@ -1533,7 +1533,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1630,7 +1630,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, io.EOF }), }, @@ -1762,7 +1762,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1926,7 +1926,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -1995,7 +1995,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -2121,7 +2121,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -2167,7 +2167,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrUnknownTopicOrPartition, @@ -2214,7 +2214,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrGroupIDNotFound, @@ -2262,7 +2262,7 @@ func TestFinalizeKind(t *testing.T) { WantErr: true, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: SchedulerFunc(func(_ context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrClusterAuthorizationFailed, diff --git a/go.mod b/go.mod index 6b5c5c1158..0385caab79 100644 --- a/go.mod +++ b/go.mod @@ -35,10 +35,10 @@ require ( k8s.io/apiserver v0.30.3 k8s.io/client-go v0.30.3 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 - knative.dev/eventing v0.42.1-0.20240918141338-17088813b4e0 + knative.dev/eventing v0.42.1-0.20240926123447-e7fca7646f4a knative.dev/hack v0.0.0-20240909014011-fc6a8452af6d - knative.dev/pkg v0.0.0-20240917091217-aaab500c26c4 - knative.dev/reconciler-test v0.0.0-20240919063827-0cb8938be2e4 + knative.dev/pkg v0.0.0-20240930065954-503173341499 + knative.dev/reconciler-test v0.0.0-20240926123451-87d857060042 sigs.k8s.io/controller-runtime v0.12.3 sigs.k8s.io/yaml v1.4.0 ) @@ -72,7 +72,7 @@ require ( github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect - github.com/gobuffalo/flect v1.0.2 // indirect + github.com/gobuffalo/flect v1.0.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect @@ -113,7 +113,7 @@ require ( github.com/wavesoftware/go-ensure v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect - go.uber.org/automaxprocs v1.5.3 // indirect + go.uber.org/automaxprocs v1.6.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.29.0 // indirect @@ -126,9 +126,9 @@ require ( golang.org/x/tools v0.25.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/api v0.183.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a // indirect - google.golang.org/grpc v1.66.2 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect + google.golang.org/grpc v1.67.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 46dc8f50a8..31925d6ed3 100644 --- a/go.sum +++ b/go.sum @@ -275,8 +275,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= -github.com/gobuffalo/flect v1.0.2 h1:eqjPGSo2WmjgY2XlpGwo2NXgL3RucAKo4k4qQMNA5sA= -github.com/gobuffalo/flect v1.0.2/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnDvkbYKHs= +github.com/gobuffalo/flect v1.0.3 h1:xeWBM2nui+qnVvNM4S3foBhCAL2XgPU+a7FdpelbTq4= +github.com/gobuffalo/flect v1.0.3/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnDvkbYKHs= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -709,8 +709,8 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= -go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= @@ -1076,10 +1076,10 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201019141844-1ed22bb0c154/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a h1:KyUe15n7B1YCu+kMmPtlXxgkLQbp+Dw0tCRZf9Sd+CE= -google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a/go.mod h1:4+X6GvPs+25wZKbQq9qyAXrwIRExv7w0Ea6MgZLZiDM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a h1:EKiZZXueP9/T68B8Nl0GAx9cjbQnCId0yP3qPMgaaHs= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1:wKguEg1hsxI2/L3hUYrpo1RVi48K+uTyzKqprwLXsb8= +google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -1095,8 +1095,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= -google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw= +google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -1214,14 +1214,14 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -knative.dev/eventing v0.42.1-0.20240918141338-17088813b4e0 h1:ZHRNK5wxqQS5NoOYBE7V0Mr8fGcH4sGPpV7HAfTmZlI= -knative.dev/eventing v0.42.1-0.20240918141338-17088813b4e0/go.mod h1:CguA8wPeeeED9ZIAJ+NqCo8fGj1W3gkEvTQs7Enk/oo= +knative.dev/eventing v0.42.1-0.20240926123447-e7fca7646f4a h1:HnJ8kus8avX0oMuzA1K3mKKV+mZJ32kJGqvtIYcoeEw= +knative.dev/eventing v0.42.1-0.20240926123447-e7fca7646f4a/go.mod h1:CguA8wPeeeED9ZIAJ+NqCo8fGj1W3gkEvTQs7Enk/oo= knative.dev/hack v0.0.0-20240909014011-fc6a8452af6d h1:mgROhGJG3+g0SBkaG4Y2HxrIOLN3ZZcN4+IFZla+Zqs= knative.dev/hack v0.0.0-20240909014011-fc6a8452af6d/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY= -knative.dev/pkg v0.0.0-20240917091217-aaab500c26c4 h1:1yMPCa3CnWH8darWwC3YxBJC19ZvE/XNA4RtNnxKPDM= -knative.dev/pkg v0.0.0-20240917091217-aaab500c26c4/go.mod h1:ZK0e9aChRwXJCpT8cypwvn/bJYTo6ygmyjiaz0E32EY= -knative.dev/reconciler-test v0.0.0-20240919063827-0cb8938be2e4 h1:64AsOs3D0kdTVC9eAtY7jbVh0BaCSZfTtMgIYaQWHgo= -knative.dev/reconciler-test v0.0.0-20240919063827-0cb8938be2e4/go.mod h1:nXvaMk3Czw2mi5J4A3aQNRi9aEJPJh/SQJdSL/8qBFM= +knative.dev/pkg v0.0.0-20240930065954-503173341499 h1:5xOSRTSjmakkXWtFFWtNTlNcks0FTN7T7wHeFrWR0qg= +knative.dev/pkg v0.0.0-20240930065954-503173341499/go.mod h1:Mh16+83vjH4yF2fTRQLiErZ1RTawIu5HMTKFVCnxx3U= +knative.dev/reconciler-test v0.0.0-20240926123451-87d857060042 h1:iex7NiH53E+EDGdC7ekbr3YL0qVlONHvZOYLra76y1Y= +knative.dev/reconciler-test v0.0.0-20240926123451-87d857060042/go.mod h1:PXOqfSSDHzaVPXrpEPlxsOSQRIQJGnSrj2IuVQh3Kas= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/vendor/github.com/gobuffalo/flect/humanize.go b/vendor/github.com/gobuffalo/flect/humanize.go index 311c8beed4..5100bfb7e7 100644 --- a/vendor/github.com/gobuffalo/flect/humanize.go +++ b/vendor/github.com/gobuffalo/flect/humanize.go @@ -7,6 +7,7 @@ import ( // Humanize returns first letter of sentence capitalized. // Common acronyms are capitalized as well. // Other capital letters in string are left as provided. +// // employee_salary = Employee salary // employee_id = employee ID // employee_mobile_number = Employee mobile number @@ -22,6 +23,10 @@ func (i Ident) Humanize() Ident { return New("") } + if strings.TrimSpace(i.Original) == "" { + return i + } + parts := xappend([]string{}, Titleize(i.Parts[0])) if len(i.Parts) > 1 { parts = xappend(parts, i.Parts[1:]...) diff --git a/vendor/go.uber.org/automaxprocs/internal/runtime/cpu_quota_linux.go b/vendor/go.uber.org/automaxprocs/internal/runtime/cpu_quota_linux.go index 3b974754c3..f9057fd273 100644 --- a/vendor/go.uber.org/automaxprocs/internal/runtime/cpu_quota_linux.go +++ b/vendor/go.uber.org/automaxprocs/internal/runtime/cpu_quota_linux.go @@ -25,15 +25,18 @@ package runtime import ( "errors" - "math" cg "go.uber.org/automaxprocs/internal/cgroups" ) // CPUQuotaToGOMAXPROCS converts the CPU quota applied to the calling process -// to a valid GOMAXPROCS value. -func CPUQuotaToGOMAXPROCS(minValue int) (int, CPUQuotaStatus, error) { - cgroups, err := newQueryer() +// to a valid GOMAXPROCS value. The quota is converted from float to int using round. +// If round == nil, DefaultRoundFunc is used. +func CPUQuotaToGOMAXPROCS(minValue int, round func(v float64) int) (int, CPUQuotaStatus, error) { + if round == nil { + round = DefaultRoundFunc + } + cgroups, err := _newQueryer() if err != nil { return -1, CPUQuotaUndefined, err } @@ -43,7 +46,7 @@ func CPUQuotaToGOMAXPROCS(minValue int) (int, CPUQuotaStatus, error) { return -1, CPUQuotaUndefined, err } - maxProcs := int(math.Floor(quota)) + maxProcs := round(quota) if minValue > 0 && maxProcs < minValue { return minValue, CPUQuotaMinUsed, nil } @@ -57,6 +60,7 @@ type queryer interface { var ( _newCgroups2 = cg.NewCGroups2ForCurrentProcess _newCgroups = cg.NewCGroupsForCurrentProcess + _newQueryer = newQueryer ) func newQueryer() (queryer, error) { diff --git a/vendor/go.uber.org/automaxprocs/internal/runtime/cpu_quota_unsupported.go b/vendor/go.uber.org/automaxprocs/internal/runtime/cpu_quota_unsupported.go index 6922554484..e74701508e 100644 --- a/vendor/go.uber.org/automaxprocs/internal/runtime/cpu_quota_unsupported.go +++ b/vendor/go.uber.org/automaxprocs/internal/runtime/cpu_quota_unsupported.go @@ -26,6 +26,6 @@ package runtime // CPUQuotaToGOMAXPROCS converts the CPU quota applied to the calling process // to a valid GOMAXPROCS value. This is Linux-specific and not supported in the // current OS. -func CPUQuotaToGOMAXPROCS(_ int) (int, CPUQuotaStatus, error) { +func CPUQuotaToGOMAXPROCS(_ int, _ func(v float64) int) (int, CPUQuotaStatus, error) { return -1, CPUQuotaUndefined, nil } diff --git a/vendor/go.uber.org/automaxprocs/internal/runtime/runtime.go b/vendor/go.uber.org/automaxprocs/internal/runtime/runtime.go index df6eacf053..f8a2834ac0 100644 --- a/vendor/go.uber.org/automaxprocs/internal/runtime/runtime.go +++ b/vendor/go.uber.org/automaxprocs/internal/runtime/runtime.go @@ -20,6 +20,8 @@ package runtime +import "math" + // CPUQuotaStatus presents the status of how CPU quota is used type CPUQuotaStatus int @@ -31,3 +33,8 @@ const ( // CPUQuotaMinUsed is returned when CPU quota is smaller than the min value CPUQuotaMinUsed ) + +// DefaultRoundFunc is the default function to convert CPU quota from float to int. It rounds the value down (floor). +func DefaultRoundFunc(v float64) int { + return int(math.Floor(v)) +} diff --git a/vendor/go.uber.org/automaxprocs/maxprocs/maxprocs.go b/vendor/go.uber.org/automaxprocs/maxprocs/maxprocs.go index 98176d6457..e561fe60b2 100644 --- a/vendor/go.uber.org/automaxprocs/maxprocs/maxprocs.go +++ b/vendor/go.uber.org/automaxprocs/maxprocs/maxprocs.go @@ -37,9 +37,10 @@ func currentMaxProcs() int { } type config struct { - printf func(string, ...interface{}) - procs func(int) (int, iruntime.CPUQuotaStatus, error) - minGOMAXPROCS int + printf func(string, ...interface{}) + procs func(int, func(v float64) int) (int, iruntime.CPUQuotaStatus, error) + minGOMAXPROCS int + roundQuotaFunc func(v float64) int } func (c *config) log(fmt string, args ...interface{}) { @@ -71,6 +72,13 @@ func Min(n int) Option { }) } +// RoundQuotaFunc sets the function that will be used to covert the CPU quota from float to int. +func RoundQuotaFunc(rf func(v float64) int) Option { + return optionFunc(func(cfg *config) { + cfg.roundQuotaFunc = rf + }) +} + type optionFunc func(*config) func (of optionFunc) apply(cfg *config) { of(cfg) } @@ -82,8 +90,9 @@ func (of optionFunc) apply(cfg *config) { of(cfg) } // configured CPU quota. func Set(opts ...Option) (func(), error) { cfg := &config{ - procs: iruntime.CPUQuotaToGOMAXPROCS, - minGOMAXPROCS: 1, + procs: iruntime.CPUQuotaToGOMAXPROCS, + roundQuotaFunc: iruntime.DefaultRoundFunc, + minGOMAXPROCS: 1, } for _, o := range opts { o.apply(cfg) @@ -102,7 +111,7 @@ func Set(opts ...Option) (func(), error) { return undoNoop, nil } - maxProcs, status, err := cfg.procs(cfg.minGOMAXPROCS) + maxProcs, status, err := cfg.procs(cfg.minGOMAXPROCS, cfg.roundQuotaFunc) if err != nil { return undoNoop, err } diff --git a/vendor/go.uber.org/automaxprocs/maxprocs/version.go b/vendor/go.uber.org/automaxprocs/maxprocs/version.go index 108a95535e..cc7fc5aee1 100644 --- a/vendor/go.uber.org/automaxprocs/maxprocs/version.go +++ b/vendor/go.uber.org/automaxprocs/maxprocs/version.go @@ -21,4 +21,4 @@ package maxprocs // Version is the current package version. -const Version = "1.5.2" +const Version = "1.6.0" diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go index a7f1eeec8e..2b87bd79c7 100644 --- a/vendor/google.golang.org/grpc/balancer/base/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go @@ -36,7 +36,7 @@ type baseBuilder struct { config Config } -func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { +func (bb *baseBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { bal := &baseBalancer{ cc: cc, pickerBuilder: bb.pickerBuilder, @@ -259,6 +259,6 @@ type errPicker struct { err error // Pick() always returns this err. } -func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { +func (p *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { return balancer.PickResult{}, p.err } diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go index 5b592f48ad..4d69b4052f 100644 --- a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go +++ b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go @@ -50,7 +50,7 @@ const ( type pickfirstBuilder struct{} -func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { +func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { b := &pickfirstBalancer{cc: cc} b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) return b diff --git a/vendor/google.golang.org/grpc/balancer_wrapper.go b/vendor/google.golang.org/grpc/balancer_wrapper.go index 6561b769eb..8ad6ce2f09 100644 --- a/vendor/google.golang.org/grpc/balancer_wrapper.go +++ b/vendor/google.golang.org/grpc/balancer_wrapper.go @@ -192,7 +192,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer return acbw, nil } -func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { +func (ccb *ccBalancerWrapper) RemoveSubConn(balancer.SubConn) { // The graceful switch balancer will never call this. logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc") } @@ -342,8 +342,8 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) ( pData := acbw.producers[pb] if pData == nil { // Not found; create a new one and add it to the producers map. - p, close := pb.Build(acbw) - pData = &refCountedProducer{producer: p, close: close} + p, closeFn := pb.Build(acbw) + pData = &refCountedProducer{producer: p, close: closeFn} acbw.producers[pb] = pData } // Account for this new reference. diff --git a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go index fcd1cfe802..55bffaa77e 100644 --- a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go +++ b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go @@ -18,7 +18,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 +// protoc-gen-go v1.34.2 // protoc v5.27.1 // source: grpc/binlog/v1/binarylog.proto @@ -1015,7 +1015,7 @@ func file_grpc_binlog_v1_binarylog_proto_rawDescGZIP() []byte { var file_grpc_binlog_v1_binarylog_proto_enumTypes = make([]protoimpl.EnumInfo, 3) var file_grpc_binlog_v1_binarylog_proto_msgTypes = make([]protoimpl.MessageInfo, 8) -var file_grpc_binlog_v1_binarylog_proto_goTypes = []interface{}{ +var file_grpc_binlog_v1_binarylog_proto_goTypes = []any{ (GrpcLogEntry_EventType)(0), // 0: grpc.binarylog.v1.GrpcLogEntry.EventType (GrpcLogEntry_Logger)(0), // 1: grpc.binarylog.v1.GrpcLogEntry.Logger (Address_Type)(0), // 2: grpc.binarylog.v1.Address.Type @@ -1058,7 +1058,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_grpc_binlog_v1_binarylog_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_grpc_binlog_v1_binarylog_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*GrpcLogEntry); i { case 0: return &v.state @@ -1070,7 +1070,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { return nil } } - file_grpc_binlog_v1_binarylog_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_grpc_binlog_v1_binarylog_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*ClientHeader); i { case 0: return &v.state @@ -1082,7 +1082,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { return nil } } - file_grpc_binlog_v1_binarylog_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_grpc_binlog_v1_binarylog_proto_msgTypes[2].Exporter = func(v any, i int) any { switch v := v.(*ServerHeader); i { case 0: return &v.state @@ -1094,7 +1094,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { return nil } } - file_grpc_binlog_v1_binarylog_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_grpc_binlog_v1_binarylog_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*Trailer); i { case 0: return &v.state @@ -1106,7 +1106,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { return nil } } - file_grpc_binlog_v1_binarylog_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + file_grpc_binlog_v1_binarylog_proto_msgTypes[4].Exporter = func(v any, i int) any { switch v := v.(*Message); i { case 0: return &v.state @@ -1118,7 +1118,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { return nil } } - file_grpc_binlog_v1_binarylog_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + file_grpc_binlog_v1_binarylog_proto_msgTypes[5].Exporter = func(v any, i int) any { switch v := v.(*Metadata); i { case 0: return &v.state @@ -1130,7 +1130,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { return nil } } - file_grpc_binlog_v1_binarylog_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + file_grpc_binlog_v1_binarylog_proto_msgTypes[6].Exporter = func(v any, i int) any { switch v := v.(*MetadataEntry); i { case 0: return &v.state @@ -1142,7 +1142,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { return nil } } - file_grpc_binlog_v1_binarylog_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + file_grpc_binlog_v1_binarylog_proto_msgTypes[7].Exporter = func(v any, i int) any { switch v := v.(*Address); i { case 0: return &v.state @@ -1155,7 +1155,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() { } } } - file_grpc_binlog_v1_binarylog_proto_msgTypes[0].OneofWrappers = []interface{}{ + file_grpc_binlog_v1_binarylog_proto_msgTypes[0].OneofWrappers = []any{ (*GrpcLogEntry_ClientHeader)(nil), (*GrpcLogEntry_ServerHeader)(nil), (*GrpcLogEntry_Message)(nil), diff --git a/vendor/google.golang.org/grpc/credentials/insecure/insecure.go b/vendor/google.golang.org/grpc/credentials/insecure/insecure.go index 82bee1443b..4c805c6446 100644 --- a/vendor/google.golang.org/grpc/credentials/insecure/insecure.go +++ b/vendor/google.golang.org/grpc/credentials/insecure/insecure.go @@ -40,7 +40,7 @@ func NewCredentials() credentials.TransportCredentials { // NoSecurity. type insecureTC struct{} -func (insecureTC) ClientHandshake(ctx context.Context, _ string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) { +func (insecureTC) ClientHandshake(_ context.Context, _ string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) { return conn, info{credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}}, nil } diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index 27c1b9bb63..2b285beee3 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -518,6 +518,8 @@ func WithUserAgent(s string) DialOption { // WithKeepaliveParams returns a DialOption that specifies keepalive parameters // for the client transport. +// +// Keepalive is disabled by default. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { if kp.Time < internal.KeepaliveMinPingTime { logger.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime) diff --git a/vendor/google.golang.org/grpc/grpclog/internal/logger.go b/vendor/google.golang.org/grpc/grpclog/internal/logger.go index 0d9a824ce1..e524fdd40b 100644 --- a/vendor/google.golang.org/grpc/grpclog/internal/logger.go +++ b/vendor/google.golang.org/grpc/grpclog/internal/logger.go @@ -81,7 +81,7 @@ func (l *LoggerWrapper) Errorf(format string, args ...any) { } // V reports whether verbosity level l is at least the requested verbose level. -func (*LoggerWrapper) V(l int) bool { +func (*LoggerWrapper) V(int) bool { // Returns true for all verbose level. return true } diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go index e65cf0ea15..d92335445f 100644 --- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 +// protoc-gen-go v1.34.2 // protoc v5.27.1 // source: grpc/health/v1/health.proto @@ -237,7 +237,7 @@ func file_grpc_health_v1_health_proto_rawDescGZIP() []byte { var file_grpc_health_v1_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_grpc_health_v1_health_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_grpc_health_v1_health_proto_goTypes = []interface{}{ +var file_grpc_health_v1_health_proto_goTypes = []any{ (HealthCheckResponse_ServingStatus)(0), // 0: grpc.health.v1.HealthCheckResponse.ServingStatus (*HealthCheckRequest)(nil), // 1: grpc.health.v1.HealthCheckRequest (*HealthCheckResponse)(nil), // 2: grpc.health.v1.HealthCheckResponse @@ -261,7 +261,7 @@ func file_grpc_health_v1_health_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_grpc_health_v1_health_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_grpc_health_v1_health_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*HealthCheckRequest); i { case 0: return &v.state @@ -273,7 +273,7 @@ func file_grpc_health_v1_health_proto_init() { return nil } } - file_grpc_health_v1_health_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_grpc_health_v1_health_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*HealthCheckResponse); i { case 0: return &v.state diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go index aa4505a871..9669328914 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go @@ -106,7 +106,7 @@ func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry } // Log creates a proto binary log entry, and logs it to the sink. -func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) { +func (ml *TruncatingMethodLogger) Log(_ context.Context, c LogEntryConfig) { ml.sink.Write(ml.Build(c)) } diff --git a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go index bb531225d5..64c791953d 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go +++ b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go @@ -234,13 +234,6 @@ func copyMap(m map[int64]string) map[int64]string { return n } -func min(a, b int) int { - if a < b { - return a - } - return b -} - func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) { if maxResults <= 0 { maxResults = EntriesPerPage diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go index 03e24e1507..078bb81238 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go +++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go @@ -33,7 +33,7 @@ var ( // outside this package except by tests. IDGen IDGenerator - db *channelMap = newChannelMap() + db = newChannelMap() // EntriesPerPage defines the number of channelz entries to be shown on a web page. EntriesPerPage = 50 curState int32 diff --git a/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go index d1ed8df6a5..0e6e18e185 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go +++ b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go @@ -35,13 +35,13 @@ type SocketOptionData struct { // Getsockopt defines the function to get socket options requested by channelz. // It is to be passed to syscall.RawConn.Control(). // Windows OS doesn't support Socket Option -func (s *SocketOptionData) Getsockopt(fd uintptr) { +func (s *SocketOptionData) Getsockopt(uintptr) { once.Do(func() { logger.Warning("Channelz: socket options are not supported on non-linux environments") }) } // GetSocketOption gets the socket option info of the conn. -func GetSocketOption(c any) *SocketOptionData { +func GetSocketOption(any) *SocketOptionData { return nil } diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 00abc7c2be..452985f8d8 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -45,7 +45,7 @@ var ( // option is present for backward compatibility. This option may be overridden // by setting the environment variable "GRPC_ENFORCE_ALPN_ENABLED" to "true" // or "false". - EnforceALPNEnabled = boolFromEnv("GRPC_ENFORCE_ALPN_ENABLED", false) + EnforceALPNEnabled = boolFromEnv("GRPC_ENFORCE_ALPN_ENABLED", true) // XDSFallbackSupport is the env variable that controls whether support for // xDS fallback is turned on. If this is unset or is false, only the first // xDS server in the list of server configs will be used. diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 73fa407b6c..7aae9240ff 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -183,7 +183,7 @@ var ( // GRPCResolverSchemeExtraMetadata determines when gRPC will add extra // metadata to RPCs. - GRPCResolverSchemeExtraMetadata string = "xds" + GRPCResolverSchemeExtraMetadata = "xds" // EnterIdleModeForTesting gets the ClientConn to enter IDLE mode. EnterIdleModeForTesting any // func(*grpc.ClientConn) @@ -203,7 +203,7 @@ var ( // UserSetDefaultScheme is set to true if the user has overridden the // default resolver scheme. - UserSetDefaultScheme bool = false + UserSetDefaultScheme = false // ShuffleAddressListForTesting pseudo-randomizes the order of addresses. n // is the number of elements. swap swaps the elements with indexes i and j. diff --git a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go index afac56572a..b901c7bace 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go +++ b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go @@ -55,7 +55,7 @@ func (r *passthroughResolver) start() { r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint()}}}) } -func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {} +func (*passthroughResolver) ResolveNow(resolver.ResolveNowOptions) {} func (*passthroughResolver) Close() {} diff --git a/vendor/google.golang.org/grpc/internal/status/status.go b/vendor/google.golang.org/grpc/internal/status/status.go index c7dbc82059..757925381f 100644 --- a/vendor/google.golang.org/grpc/internal/status/status.go +++ b/vendor/google.golang.org/grpc/internal/status/status.go @@ -138,11 +138,11 @@ func (s *Status) WithDetails(details ...protoadapt.MessageV1) (*Status, error) { // s.Code() != OK implies that s.Proto() != nil. p := s.Proto() for _, detail := range details { - any, err := anypb.New(protoadapt.MessageV2Of(detail)) + m, err := anypb.New(protoadapt.MessageV2Of(detail)) if err != nil { return nil, err } - p.Details = append(p.Details, any) + p.Details = append(p.Details, m) } return &Status{s: p}, nil } diff --git a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go index 999f52cd75..54c24c2ff3 100644 --- a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go +++ b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go @@ -58,20 +58,20 @@ func GetRusage() *Rusage { // CPUTimeDiff returns the differences of user CPU time and system CPU time used // between two Rusage structs. It a no-op function for non-linux environments. -func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) { +func CPUTimeDiff(*Rusage, *Rusage) (float64, float64) { log() return 0, 0 } // SetTCPUserTimeout is a no-op function under non-linux environments. -func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error { +func SetTCPUserTimeout(net.Conn, time.Duration) error { log() return nil } // GetTCPUserTimeout is a no-op function under non-linux environments. // A negative return value indicates the operation is not supported -func GetTCPUserTimeout(conn net.Conn) (int, error) { +func GetTCPUserTimeout(net.Conn) (int, error) { log() return -1, nil } diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go index ea0633bbda..ef72fbb3a0 100644 --- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go +++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go @@ -1033,10 +1033,3 @@ func (l *loopyWriter) processData() (bool, error) { } return false, nil } - -func min(a, b int) int { - if a < b { - return a - } - return b -} diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index e1cd86b2fc..ce878693bd 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -333,7 +333,7 @@ func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) { s.hdrMu.Unlock() } -func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error { +func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data mem.BufferSlice, _ *Options) error { // Always take a reference because otherwise there is no guarantee the data will // be available after this function returns. This is what callers to Write // expect. @@ -475,7 +475,7 @@ func (ht *serverHandlerTransport) IncrMsgSent() {} func (ht *serverHandlerTransport) IncrMsgRecv() {} -func (ht *serverHandlerTransport) Drain(debugData string) { +func (ht *serverHandlerTransport) Drain(string) { panic("Drain() is not implemented") } diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index f46194fdc6..c769deab53 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -772,7 +772,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, hdr := &headerFrame{ hf: headerFields, endStream: false, - initStream: func(id uint32) error { + initStream: func(uint32) error { t.mu.Lock() // TODO: handle transport closure in loopy instead and remove this // initStream is never called when transport is draining. @@ -1667,11 +1667,10 @@ func (t *http2Client) reader(errCh chan<- error) { t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false) } continue - } else { - // Transport error. - t.Close(connectionErrorf(true, err, "error reading from server: %v", err)) - return } + // Transport error. + t.Close(connectionErrorf(true, err, "error reading from server: %v", err)) + return } switch frame := frame.(type) { case *http2.MetaHeadersFrame: @@ -1696,13 +1695,6 @@ func (t *http2Client) reader(errCh chan<- error) { } } -func minTime(a, b time.Duration) time.Duration { - if a < b { - return a - } - return b -} - // keepalive running in a separate goroutine makes sure the connection is alive by sending pings. func (t *http2Client) keepalive() { p := &ping{data: [8]byte{}} @@ -1770,7 +1762,7 @@ func (t *http2Client) keepalive() { // timeoutLeft. This will ensure that we wait only for kp.Time // before sending out the next ping (for cases where the ping is // acked). - sleepDuration := minTime(t.kp.Time, timeoutLeft) + sleepDuration := min(t.kp.Time, timeoutLeft) timeoutLeft -= sleepDuration timer.Reset(sleepDuration) case <-t.ctx.Done(): diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index f5163f770c..584b50fe55 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -1117,7 +1117,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { // Write converts the data into HTTP2 data frame and sends it out. Non-nil error // is returns if it fails (e.g., framing error, transport error). -func (t *http2Server) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error { +func (t *http2Server) Write(s *Stream, hdr []byte, data mem.BufferSlice, _ *Options) error { reader := data.Reader() if !s.isHeaderSent() { // Headers haven't been written yet. @@ -1238,7 +1238,7 @@ func (t *http2Server) keepalive() { // timeoutLeft. This will ensure that we wait only for kp.Time // before sending out the next ping (for cases where the ping is // acked). - sleepDuration := minTime(t.kp.Time, kpTimeoutLeft) + sleepDuration := min(t.kp.Time, kpTimeoutLeft) kpTimeoutLeft -= sleepDuration kpTimer.Reset(sleepDuration) case <-t.done: diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go index f609c6c665..3613d7b648 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http_util.go +++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go @@ -393,7 +393,7 @@ type framer struct { fr *http2.Framer } -var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) +var writeBufferPoolMap = make(map[int]*sync.Pool) var writeBufferMutex sync.Mutex func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer { diff --git a/vendor/google.golang.org/grpc/keepalive/keepalive.go b/vendor/google.golang.org/grpc/keepalive/keepalive.go index 34d31b5e7d..eb42b19fb9 100644 --- a/vendor/google.golang.org/grpc/keepalive/keepalive.go +++ b/vendor/google.golang.org/grpc/keepalive/keepalive.go @@ -34,15 +34,29 @@ type ClientParameters struct { // After a duration of this time if the client doesn't see any activity it // pings the server to see if the transport is still alive. // If set below 10s, a minimum value of 10s will be used instead. - Time time.Duration // The current default value is infinity. + // + // Note that gRPC servers have a default EnforcementPolicy.MinTime of 5 + // minutes (which means the client shouldn't ping more frequently than every + // 5 minutes). + // + // Though not ideal, it's not a strong requirement for Time to be less than + // EnforcementPolicy.MinTime. Time will automatically double if the server + // disconnects due to its enforcement policy. + // + // For more details, see + // https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md + Time time.Duration // After having pinged for keepalive check, the client waits for a duration // of Timeout and if no activity is seen even after that the connection is // closed. - Timeout time.Duration // The current default value is 20 seconds. + // + // If keepalive is enabled, and this value is not explicitly set, the default + // is 20 seconds. + Timeout time.Duration // If true, client sends keepalive pings even with no active RPCs. If false, // when there are no active RPCs, Time and Timeout will be ignored and no // keepalive pings will be sent. - PermitWithoutStream bool // false by default. + PermitWithoutStream bool } // ServerParameters is used to set keepalive and max-age parameters on the diff --git a/vendor/google.golang.org/grpc/mem/buffers.go b/vendor/google.golang.org/grpc/mem/buffers.go index 975ceb7185..4d66b2ccc2 100644 --- a/vendor/google.golang.org/grpc/mem/buffers.go +++ b/vendor/google.golang.org/grpc/mem/buffers.go @@ -224,11 +224,11 @@ func (e emptyBuffer) Len() int { return 0 } -func (e emptyBuffer) split(n int) (left, right Buffer) { +func (e emptyBuffer) split(int) (left, right Buffer) { return e, e } -func (e emptyBuffer) read(buf []byte) (int, Buffer) { +func (e emptyBuffer) read([]byte) (int, Buffer) { return 0, e } diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index db8865ec3f..2d96f1405e 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -220,8 +220,8 @@ type HeaderCallOption struct { HeaderAddr *metadata.MD } -func (o HeaderCallOption) before(c *callInfo) error { return nil } -func (o HeaderCallOption) after(c *callInfo, attempt *csAttempt) { +func (o HeaderCallOption) before(*callInfo) error { return nil } +func (o HeaderCallOption) after(_ *callInfo, attempt *csAttempt) { *o.HeaderAddr, _ = attempt.s.Header() } @@ -242,8 +242,8 @@ type TrailerCallOption struct { TrailerAddr *metadata.MD } -func (o TrailerCallOption) before(c *callInfo) error { return nil } -func (o TrailerCallOption) after(c *callInfo, attempt *csAttempt) { +func (o TrailerCallOption) before(*callInfo) error { return nil } +func (o TrailerCallOption) after(_ *callInfo, attempt *csAttempt) { *o.TrailerAddr = attempt.s.Trailer() } @@ -264,8 +264,8 @@ type PeerCallOption struct { PeerAddr *peer.Peer } -func (o PeerCallOption) before(c *callInfo) error { return nil } -func (o PeerCallOption) after(c *callInfo, attempt *csAttempt) { +func (o PeerCallOption) before(*callInfo) error { return nil } +func (o PeerCallOption) after(_ *callInfo, attempt *csAttempt) { if x, ok := peer.FromContext(attempt.s.Context()); ok { *o.PeerAddr = *x } @@ -304,7 +304,7 @@ func (o FailFastCallOption) before(c *callInfo) error { c.failFast = o.FailFast return nil } -func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o FailFastCallOption) after(*callInfo, *csAttempt) {} // OnFinish returns a CallOption that configures a callback to be called when // the call completes. The error passed to the callback is the status of the @@ -339,7 +339,7 @@ func (o OnFinishCallOption) before(c *callInfo) error { return nil } -func (o OnFinishCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o OnFinishCallOption) after(*callInfo, *csAttempt) {} // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size // in bytes the client can receive. If this is not set, gRPC uses the default @@ -363,7 +363,7 @@ func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error { c.maxReceiveMessageSize = &o.MaxRecvMsgSize return nil } -func (o MaxRecvMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o MaxRecvMsgSizeCallOption) after(*callInfo, *csAttempt) {} // MaxCallSendMsgSize returns a CallOption which sets the maximum message size // in bytes the client can send. If this is not set, gRPC uses the default @@ -387,7 +387,7 @@ func (o MaxSendMsgSizeCallOption) before(c *callInfo) error { c.maxSendMessageSize = &o.MaxSendMsgSize return nil } -func (o MaxSendMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o MaxSendMsgSizeCallOption) after(*callInfo, *csAttempt) {} // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials // for a call. @@ -410,7 +410,7 @@ func (o PerRPCCredsCallOption) before(c *callInfo) error { c.creds = o.Creds return nil } -func (o PerRPCCredsCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o PerRPCCredsCallOption) after(*callInfo, *csAttempt) {} // UseCompressor returns a CallOption which sets the compressor used when // sending the request. If WithCompressor is also set, UseCompressor has @@ -438,7 +438,7 @@ func (o CompressorCallOption) before(c *callInfo) error { c.compressorType = o.CompressorType return nil } -func (o CompressorCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o CompressorCallOption) after(*callInfo, *csAttempt) {} // CallContentSubtype returns a CallOption that will set the content-subtype // for a call. For example, if content-subtype is "json", the Content-Type over @@ -475,7 +475,7 @@ func (o ContentSubtypeCallOption) before(c *callInfo) error { c.contentSubtype = o.ContentSubtype return nil } -func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o ContentSubtypeCallOption) after(*callInfo, *csAttempt) {} // ForceCodec returns a CallOption that will set codec to be used for all // request and response messages for a call. The result of calling Name() will @@ -514,7 +514,7 @@ func (o ForceCodecCallOption) before(c *callInfo) error { c.codec = newCodecV1Bridge(o.Codec) return nil } -func (o ForceCodecCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o ForceCodecCallOption) after(*callInfo, *csAttempt) {} // ForceCodecV2 returns a CallOption that will set codec to be used for all // request and response messages for a call. The result of calling Name() will @@ -554,7 +554,7 @@ func (o ForceCodecV2CallOption) before(c *callInfo) error { return nil } -func (o ForceCodecV2CallOption) after(c *callInfo, attempt *csAttempt) {} +func (o ForceCodecV2CallOption) after(*callInfo, *csAttempt) {} // CallCustomCodec behaves like ForceCodec, but accepts a grpc.Codec instead of // an encoding.Codec. @@ -579,7 +579,7 @@ func (o CustomCodecCallOption) before(c *callInfo) error { c.codec = newCodecV0Bridge(o.Codec) return nil } -func (o CustomCodecCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o CustomCodecCallOption) after(*callInfo, *csAttempt) {} // MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory // used for buffering this RPC's requests for retry purposes. @@ -607,7 +607,7 @@ func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error { c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize return nil } -func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o MaxRetryRPCBufferSizeCallOption) after(*callInfo, *csAttempt) {} // The format of the payload: compressed or not? type payloadFormat uint8 diff --git a/vendor/google.golang.org/grpc/stream_interfaces.go b/vendor/google.golang.org/grpc/stream_interfaces.go index 8b813529c0..0037fee0bd 100644 --- a/vendor/google.golang.org/grpc/stream_interfaces.go +++ b/vendor/google.golang.org/grpc/stream_interfaces.go @@ -22,15 +22,35 @@ package grpc // request, many responses) RPC. It is generic over the type of the response // message. It is used in generated code. type ServerStreamingClient[Res any] interface { + // Recv receives the next response message from the server. The client may + // repeatedly call Recv to read messages from the response stream. If + // io.EOF is returned, the stream has terminated with an OK status. Any + // other error is compatible with the status package and indicates the + // RPC's status code and message. Recv() (*Res, error) + + // ClientStream is embedded to provide Context, Header, and Trailer + // functionality. No other methods in the ClientStream should be called + // directly. ClientStream } // ServerStreamingServer represents the server side of a server-streaming (one // request, many responses) RPC. It is generic over the type of the response // message. It is used in generated code. +// +// To terminate the response stream, return from the handler method and return +// an error from the status package, or use nil to indicate an OK status code. type ServerStreamingServer[Res any] interface { + // Send sends a response message to the client. The server handler may + // call Send multiple times to send multiple messages to the client. An + // error is returned if the stream was terminated unexpectedly, and the + // handler method should return, as the stream is no longer usable. Send(*Res) error + + // ServerStream is embedded to provide Context, SetHeader, SendHeader, and + // SetTrailer functionality. No other methods in the ServerStream should + // be called directly. ServerStream } @@ -39,8 +59,22 @@ type ServerStreamingServer[Res any] interface { // message stream and the type of the unary response message. It is used in // generated code. type ClientStreamingClient[Req any, Res any] interface { + // Send sends a request message to the server. The client may call Send + // multiple times to send multiple messages to the server. On error, Send + // aborts the stream. If the error was generated by the client, the status + // is returned directly. Otherwise, io.EOF is returned, and the status of + // the stream may be discovered using CloseAndRecv(). Send(*Req) error + + // CloseAndRecv closes the request stream and waits for the server's + // response. This method must be called once and only once after sending + // all request messages. Any error returned is implemented by the status + // package. CloseAndRecv() (*Res, error) + + // ClientStream is embedded to provide Context, Header, and Trailer + // functionality. No other methods in the ClientStream should be called + // directly. ClientStream } @@ -48,9 +82,28 @@ type ClientStreamingClient[Req any, Res any] interface { // requests, one response) RPC. It is generic over both the type of the request // message stream and the type of the unary response message. It is used in // generated code. +// +// To terminate the RPC, call SendAndClose and return nil from the method +// handler or do not call SendAndClose and return an error from the status +// package. type ClientStreamingServer[Req any, Res any] interface { + // Recv receives the next request message from the client. The server may + // repeatedly call Recv to read messages from the request stream. If + // io.EOF is returned, it indicates the client called CloseAndRecv on its + // ClientStreamingClient. Any other error indicates the stream was + // terminated unexpectedly, and the handler method should return, as the + // stream is no longer usable. Recv() (*Req, error) + + // SendAndClose sends a single response message to the client and closes + // the stream. This method must be called once and only once after all + // request messages have been processed. Recv should not be called after + // calling SendAndClose. SendAndClose(*Res) error + + // ServerStream is embedded to provide Context, SetHeader, SendHeader, and + // SetTrailer functionality. No other methods in the ServerStream should + // be called directly. ServerStream } @@ -59,8 +112,23 @@ type ClientStreamingServer[Req any, Res any] interface { // request message stream and the type of the response message stream. It is // used in generated code. type BidiStreamingClient[Req any, Res any] interface { + // Send sends a request message to the server. The client may call Send + // multiple times to send multiple messages to the server. On error, Send + // aborts the stream. If the error was generated by the client, the status + // is returned directly. Otherwise, io.EOF is returned, and the status of + // the stream may be discovered using Recv(). Send(*Req) error + + // Recv receives the next response message from the server. The client may + // repeatedly call Recv to read messages from the response stream. If + // io.EOF is returned, the stream has terminated with an OK status. Any + // other error is compatible with the status package and indicates the + // RPC's status code and message. Recv() (*Res, error) + + // ClientStream is embedded to provide Context, Header, Trailer, and + // CloseSend functionality. No other methods in the ClientStream should be + // called directly. ClientStream } @@ -68,9 +136,27 @@ type BidiStreamingClient[Req any, Res any] interface { // (many requests, many responses) RPC. It is generic over both the type of the // request message stream and the type of the response message stream. It is // used in generated code. +// +// To terminate the stream, return from the handler method and return +// an error from the status package, or use nil to indicate an OK status code. type BidiStreamingServer[Req any, Res any] interface { + // Recv receives the next request message from the client. The server may + // repeatedly call Recv to read messages from the request stream. If + // io.EOF is returned, it indicates the client called CloseSend on its + // BidiStreamingClient. Any other error indicates the stream was + // terminated unexpectedly, and the handler method should return, as the + // stream is no longer usable. Recv() (*Req, error) + + // Send sends a response message to the client. The server handler may + // call Send multiple times to send multiple messages to the client. An + // error is returned if the stream was terminated unexpectedly, and the + // handler method should return, as the stream is no longer usable. Send(*Res) error + + // ServerStream is embedded to provide Context, SetHeader, SendHeader, and + // SetTrailer functionality. No other methods in the ServerStream should + // be called directly. ServerStream } diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 7c70005d08..187fbf1195 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.66.2" +const Version = "1.67.0" diff --git a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go index 88e470d8b4..a9ca7b1d5a 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro // Scheduler is responsible for placing VPods into real Kubernetes pods type Scheduler interface { // Schedule computes the new set of placements for vpod. - Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) + Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) } // SchedulerFunc type is an adapter to allow the use of // ordinary functions as Schedulers. If f is a function // with the appropriate signature, SchedulerFunc(f) is a // Scheduler that calls f. -type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error) +type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) // Schedule implements the Scheduler interface. -func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) { - return f(vpod) +func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) { + return f(ctx, vpod) } // VPod represents virtual replicas placed into real Kubernetes pods diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go b/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go index 5ec66b2156..ad3a5aaf76 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/eventing/pkg/scheduler" ) @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool { var zoneName string var err error for _, podID := range feasiblePods { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) - return err == nil, nil - }) + zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) + if err != nil { + continue + } zoneMap[zoneName] = struct{}{} } return len(zoneMap) == int(states.NumZones) diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go index aa84ca996f..44069babe9 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go @@ -22,7 +22,6 @@ import ( "errors" "math" "strconv" - "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" @@ -30,9 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" corev1 "k8s.io/client-go/listers/core/v1" - "knative.dev/pkg/logging" "knative.dev/eventing/pkg/scheduler" @@ -42,7 +39,7 @@ type StateAccessor interface { // State returns the current state (snapshot) about placed vpods // Take into account reserved vreplicas and update `reserved` to reflect // the current state. - State(reserved map[types.NamespacedName]map[string]int32) (*State, error) + State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) } // state provides information about the current scheduling of all vpods @@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool { // stateBuilder reconstruct the state from scratch, by listing vpods type stateBuilder struct { - ctx context.Context - logger *zap.SugaredLogger vpodLister scheduler.VPodLister capacity int32 schedulerPolicy scheduler.SchedulerPolicyType @@ -166,11 +161,9 @@ type stateBuilder struct { } // NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { +func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { return &stateBuilder{ - ctx: ctx, - logger: logging.FromContext(ctx), vpodLister: lister, capacity: podCapacity, schedulerPolicy: schedulerPolicy, @@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche } } -func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) { +func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) { vpods, err := s.vpodLister() if err != nil { return nil, err } - scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{}) + logger := logging.FromContext(ctx).With("subcomponent", "statebuilder") + ctx = logging.WithLogger(ctx, logger) + + scale, err := s.statefulSetCache.GetScale(ctx, s.statefulSetName, metav1.GetOptions{}) if err != nil { - s.logger.Infow("failed to get statefulset", zap.Error(err)) + logger.Infow("failed to get statefulset", zap.Error(err)) return nil, err } @@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ { - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) - return err == nil, nil - }) - - if pod != nil { - if isPodUnschedulable(pod) { - // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. - continue - } - - node, err := s.nodeLister.Get(pod.Spec.NodeName) - if err != nil { - return nil, err - } + pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) + if err != nil { + logger.Warnw("Failed to get pod", zap.Int32("ordinal", podId), zap.Error(err)) + continue + } + if isPodUnschedulable(pod) { + // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. + logger.Debugw("Pod is unschedulable", zap.Any("pod", pod)) + continue + } - if isNodeUnschedulable(node) { - // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. - continue - } + node, err := s.nodeLister.Get(pod.Spec.NodeName) + if err != nil { + return nil, err + } - // Pod has no annotation or not annotated as unschedulable and - // not on an unschedulable node, so add to feasible - schedulablePods.Insert(podId) + if isNodeUnschedulable(node) { + // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. + logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node)) + continue } + + // Pod has no annotation or not annotated as unschedulable and + // not on an unschedulable node, so add to feasible + schedulablePods.Insert(podId) } for _, p := range schedulablePods.List() { - free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) } // Getting current state from existing placements for all vpods @@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) // Account for reserved vreplicas vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved) - free, last = s.updateFreeCapacity(free, last, podName, vreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) withPlacement[vpod.GetKey()][podName] = true - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) continue } - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -330,7 +323,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } } - free, last = s.updateFreeCapacity(free, last, podName, rvreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas) } } @@ -338,7 +331,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister, PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} - s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) + logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) return state, nil } @@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 { return int32(math.Max(float64(0), float64(expected-scheduled))) } -func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { +func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { ordinal := OrdinalFromPodName(podName) free = grow(free, ordinal, s.capacity) @@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri // Assert the pod is not overcommitted if free[ordinal] < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } if ordinal > last { diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go index 296feb16f2..3245dabc16 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go @@ -18,6 +18,7 @@ package statefulset import ( "context" + "fmt" "math" "sync" "sync/atomic" @@ -27,10 +28,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "knative.dev/pkg/reconciler" - "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" "knative.dev/eventing/pkg/scheduler" st "knative.dev/eventing/pkg/scheduler/state" @@ -58,9 +57,8 @@ type autoscaler struct { statefulSetCache *scheduler.ScaleCache statefulSetName string vpodLister scheduler.VPodLister - logger *zap.SugaredLogger stateAccessor st.StateAccessor - trigger chan struct{} + trigger chan context.Context evictor scheduler.Evictor // capacity is the total number of virtual replicas available per pod. @@ -68,7 +66,9 @@ type autoscaler struct { // refreshPeriod is how often the autoscaler tries to scale down the statefulset refreshPeriod time.Duration - lock sync.Locker + // retryPeriod is how often the autoscaler retry failed autoscale operations + retryPeriod time.Duration + lock sync.Locker // isLeader signals whether a given autoscaler instance is leader or not. // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a @@ -104,17 +104,17 @@ func (a *autoscaler) Demote(b reconciler.Bucket) { } } -func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { - return &autoscaler{ - logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")), +func newAutoscaler(cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { + a := &autoscaler{ statefulSetCache: statefulSetCache, statefulSetName: cfg.StatefulSetName, vpodLister: cfg.VPodLister, stateAccessor: stateAccessor, evictor: cfg.Evictor, - trigger: make(chan struct{}, 1), + trigger: make(chan context.Context, 1), capacity: cfg.PodCapacity, refreshPeriod: cfg.RefreshPeriod, + retryPeriod: cfg.RetryPeriod, lock: new(sync.Mutex), isLeader: atomic.Bool{}, getReserved: cfg.getReserved, @@ -124,25 +124,38 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces Add(-cfg.RefreshPeriod). Add(-time.Minute), } + + if a.retryPeriod == 0 { + a.retryPeriod = time.Second + } + + return a } func (a *autoscaler) Start(ctx context.Context) { attemptScaleDown := false for { + autoscaleCtx := ctx select { case <-ctx.Done(): return case <-time.After(a.refreshPeriod): - a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) + logging.FromContext(ctx).Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = true - case <-a.trigger: - a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) + case autoscaleCtx = <-a.trigger: + logging.FromContext(autoscaleCtx).Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = false } // Retry a few times, just so that we don't have to wait for the next beat when // a transient error occurs - a.syncAutoscale(ctx, attemptScaleDown) + if err := a.syncAutoscale(autoscaleCtx, attemptScaleDown); err != nil { + logging.FromContext(autoscaleCtx).Errorw("Failed to sync autoscale", zap.Error(err)) + go func() { + time.Sleep(a.retryPeriod) + a.Autoscale(ctx) // Use top-level context for background retries + }() + } } } @@ -150,10 +163,10 @@ func (a *autoscaler) Autoscale(ctx context.Context) { select { // We trigger the autoscaler asynchronously by using the channel so that the scale down refresh // period is reset. - case a.trigger <- struct{}{}: + case a.trigger <- ctx: default: // We don't want to block if the channel's buffer is full, it will be triggered eventually. - + logging.FromContext(ctx).Debugw("Skipping autoscale since autoscale is in progress") } } @@ -161,36 +174,34 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) e a.lock.Lock() defer a.lock.Unlock() - var lastErr error - wait.Poll(500*time.Millisecond, 5*time.Second, func() (bool, error) { - err := a.doautoscale(ctx, attemptScaleDown) - if err != nil { - logging.FromContext(ctx).Errorw("Failed to autoscale", zap.Error(err)) - } - lastErr = err - return err == nil, nil - }) - return lastErr + if err := a.doautoscale(ctx, attemptScaleDown); err != nil { + return fmt.Errorf("failed to do autoscale: %w", err) + } + return nil } func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) error { if !a.isLeader.Load() { return nil } - state, err := a.stateAccessor.State(a.getReserved()) + + logger := logging.FromContext(ctx).With("component", "autoscaler") + ctx = logging.WithLogger(ctx, logger) + + state, err := a.stateAccessor.State(ctx, a.getReserved()) if err != nil { - a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) + logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return err } scale, err := a.statefulSetCache.GetScale(ctx, a.statefulSetName, metav1.GetOptions{}) if err != nil { // skip a beat - a.logger.Infow("failed to get scale subresource", zap.Error(err)) + logger.Infow("failed to get scale subresource", zap.Error(err)) return err } - a.logger.Debugw("checking adapter capacity", + logger.Debugw("checking adapter capacity", zap.Int32("replicas", scale.Spec.Replicas), zap.Any("state", state)) @@ -234,43 +245,43 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err if newreplicas != scale.Spec.Replicas { scale.Spec.Replicas = newreplicas - a.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) + logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) _, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{}) if err != nil { - a.logger.Errorw("updating scale subresource failed", zap.Error(err)) + logger.Errorw("updating scale subresource failed", zap.Error(err)) return err } } else if attemptScaleDown { // since the number of replicas hasn't changed and time has approached to scale down, // take the opportunity to compact the vreplicas - a.mayCompact(state, scaleUpFactor) + return a.mayCompact(logger, state, scaleUpFactor) } return nil } -func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { +func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State, scaleUpFactor int32) error { // This avoids a too aggressive scale down by adding a "grace period" based on the refresh // period nextAttempt := a.lastCompactAttempt.Add(a.refreshPeriod) if time.Now().Before(nextAttempt) { - a.logger.Debugw("Compact was retried before refresh period", + logger.Debugw("Compact was retried before refresh period", zap.Time("lastCompactAttempt", a.lastCompactAttempt), zap.Time("nextAttempt", nextAttempt), zap.String("refreshPeriod", a.refreshPeriod.String()), ) - return + return nil } - a.logger.Debugw("Trying to compact and scale down", + logger.Debugw("Trying to compact and scale down", zap.Int32("scaleUpFactor", scaleUpFactor), zap.Any("state", s), ) // when there is only one pod there is nothing to move or number of pods is just enough! if s.LastOrdinal < 1 || len(s.SchedulablePods) <= int(scaleUpFactor) { - return + return nil } if s.SchedulerPolicy == scheduler.MAXFILLUP { @@ -283,7 +294,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } @@ -303,10 +314,11 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } } + return nil } func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { @@ -323,16 +335,14 @@ func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { ordinal := st.OrdinalFromPodName(placements[i].PodName) if ordinal == s.LastOrdinal-j { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - if s.PodLister != nil { - pod, err = s.PodLister.Get(placements[i].PodName) - } - return err == nil, nil - }) + pod, err = s.PodLister.Get(placements[i].PodName) + if err != nil { + return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) + } err = a.evictor(pod, vpod, &placements[i]) if err != nil { - return err + return fmt.Errorf("failed to evict pod %s: %w", pod.Name, err) } } } diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go index 1256e2769d..6995d6ff45 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go @@ -33,11 +33,11 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" + "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/controller" - "knative.dev/pkg/logging" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/scheduler" @@ -67,6 +67,8 @@ type Config struct { PodCapacity int32 `json:"podCapacity"` // Autoscaler refresh period RefreshPeriod time.Duration `json:"refreshPeriod"` + // Autoscaler retry period + RetryPeriod time.Duration `json:"retryPeriod"` SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` @@ -91,14 +93,14 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig) - stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) + stateAccessor := st.NewStateBuilder(cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) var getReserved GetReserved cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { return getReserved() } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) var wg sync.WaitGroup wg.Add(1) @@ -126,8 +128,6 @@ func (p Pending) Total() int32 { // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods type StatefulSetScheduler struct { - ctx context.Context - logger *zap.SugaredLogger statefulSetName string statefulSetNamespace string statefulSetClient clientappsv1.StatefulSetInterface @@ -171,8 +171,6 @@ func newStatefulSetScheduler(ctx context.Context, autoscaler Autoscaler) *StatefulSetScheduler { scheduler := &StatefulSetScheduler{ - ctx: ctx, - logger: logging.FromContext(ctx), statefulSetNamespace: cfg.StatefulSetNamespace, statefulSetName: cfg.StatefulSetName, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), @@ -193,7 +191,9 @@ func newStatefulSetScheduler(ctx context.Context, sif.Apps().V1().StatefulSets().Informer(). AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), - Handler: controller.HandleAll(scheduler.updateStatefulset), + Handler: controller.HandleAll(func(i interface{}) { + scheduler.updateStatefulset(ctx, i) + }), }) sif.Start(ctx.Done()) @@ -207,13 +207,13 @@ func newStatefulSetScheduler(ctx context.Context, return scheduler } -func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { +func (s *StatefulSetScheduler) Schedule(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { s.lock.Lock() defer s.lock.Unlock() s.reservedMu.Lock() defer s.reservedMu.Unlock() - placements, err := s.scheduleVPod(vpod) + placements, err := s.scheduleVPod(ctx, vpod) if placements == nil { return placements, err } @@ -228,11 +228,13 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla return placements, err } -func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { - logger := s.logger.With("key", vpod.GetKey(), zap.String("component", "scheduler")) +func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { + logger := logging.FromContext(ctx).With("key", vpod.GetKey(), zap.String("component", "scheduler")) + ctx = logging.WithLogger(ctx, logger) + // Get the current placements state // Quite an expensive operation but safe and simple. - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Debug("error while refreshing scheduler state (will retry)", zap.Error(err)) return nil, err @@ -270,13 +272,15 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Handle overcommitted pods. - if state.FreeCap[ordinal] < 0 { + if state.Free(ordinal) < 0 { // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 overcommit := -state.FreeCap[ordinal] + logger.Debugw("overcommit", zap.Any("overcommit", overcommit), zap.Any("placement", p)) + if p.VReplicas >= overcommit { state.SetFree(ordinal, 0) state.Pending[vpod.GetKey()] += overcommit @@ -313,7 +317,9 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 if state.SchedulerPolicy != "" { // Need less => scale down if tr > vpod.GetVReplicas() { - logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) @@ -323,15 +329,19 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Need more => scale up - logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) } else { //Predicates and priorities must be used for scheduling // Need less => scale down if tr > vpod.GetVReplicas() && state.DeschedPolicy != nil { - logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements = s.removeReplicasWithPolicy(vpod, tr-vpod.GetVReplicas(), placements) + logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements = s.removeReplicasWithPolicy(ctx, vpod, tr-vpod.GetVReplicas(), placements) // Do not trigger the autoscaler to avoid unnecessary churn @@ -343,8 +353,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Need more => scale up // rebalancing needed for all vreps most likely since there are pending vreps from previous reconciliation // can fall here when vreps scaled up or after eviction - logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements, left = s.rebalanceReplicasWithPolicy(vpod, vpod.GetVReplicas(), placements) + logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements, left = s.rebalanceReplicasWithPolicy(ctx, vpod, vpod.GetVReplicas(), placements) } } @@ -355,10 +367,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Trigger the autoscaler if s.autoscaler != nil { logger.Infow("Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left)) - s.autoscaler.Autoscale(s.ctx) + s.autoscaler.Autoscale(ctx) } - if state.SchedPolicy != nil { + if state.SchedulerPolicy == "" && state.SchedPolicy != nil { logger.Info("reverting to previous placements") s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job @@ -380,25 +392,25 @@ func toJSONable(pending map[types.NamespacedName]int32) map[string]int32 { return r } -func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { s.makeZeroPlacements(vpod, placements) - placements, diff = s.addReplicasWithPolicy(vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list + placements, diff = s.addReplicasWithPolicy(ctx, vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list return placements, diff } -func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { - logger := s.logger.Named("remove replicas with policy") +func (s *StatefulSetScheduler) removeReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { + logger := logging.FromContext(ctx).Named("remove replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //deschedule one vreplica at a time - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.DeschedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.DeschedPolicy) feasiblePods = s.removePodsNotInPlacement(vpod, feasiblePods) if len(feasiblePods) == 1 { //nothing to score, remove vrep from that pod placementPodID := feasiblePods[0] @@ -409,7 +421,7 @@ func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, dif continue } - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.DeschedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.DeschedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -455,13 +467,13 @@ func (s *StatefulSetScheduler) removeSelectionFromPlacements(placementPodID int3 return newPlacements } -func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { - logger := s.logger.Named("add replicas with policy") +func (s *StatefulSetScheduler) addReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { + logger := logging.FromContext(ctx).Named("add replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //schedule one vreplica at a time (find most suitable pod placement satisying predicates with high score) // Get the current placements state - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements, diff @@ -474,7 +486,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i break //end the iteration for all vreps since there are not pods } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.SchedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.SchedPolicy) if len(feasiblePods) == 0 { //no pods available to schedule this vreplica logger.Info("no feasible pods available to schedule this vreplica") s.reservePlacements(vpod, placements) @@ -492,7 +504,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i continue } */ - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.SchedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.SchedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -567,7 +579,7 @@ func (s *StatefulSetScheduler) removePodsNotInPlacement(vpod scheduler.VPod, fea // prioritizePods prioritizes the pods by running the score plugins, which return a score for each pod. // The scores from each plugin are added together to make the score for that pod. func (s *StatefulSetScheduler) prioritizePods(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PodScoreList, error) { - logger := s.logger.Named("prioritize all feasible pods") + logger := logging.FromContext(ctx).Named("prioritize all feasible pods") // If no priority configs are provided, then all pods will have a score of one result := make(st.PodScoreList, 0, len(feasiblePods)) @@ -630,7 +642,7 @@ func (s *StatefulSetScheduler) selectPod(podScoreList st.PodScoreList) (int32, e // If any of these plugins doesn't return "Success", the pod is not suitable for placing the vrep. // Meanwhile, the failure message and status are set for the given pod. func (s *StatefulSetScheduler) RunFilterPlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, podID int32, policy *scheduler.SchedulerPolicy) st.PluginToStatus { - logger := s.logger.Named("run all filter plugins") + logger := logging.FromContext(ctx).Named("run all filter plugins") statuses := make(st.PluginToStatus) for _, plugin := range policy.Predicates { @@ -663,7 +675,7 @@ func (s *StatefulSetScheduler) runFilterPlugin(ctx context.Context, pl st.Filter // RunScorePlugins runs the set of configured scoring plugins. It returns a list that stores for each scoring plugin name the corresponding PodScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns a non-success status. func (s *StatefulSetScheduler) RunScorePlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PluginToPodScores, *st.Status) { - logger := s.logger.Named("run all score plugins") + logger := logging.FromContext(ctx).Named("run all score plugins") pluginToPodScores := make(st.PluginToPodScores, len(policy.Priorities)) for _, plugin := range policy.Priorities { @@ -776,10 +788,11 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placeme return newPlacements, diff } -func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { +func (s *StatefulSetScheduler) updateStatefulset(ctx context.Context, obj interface{}) { statefulset, ok := obj.(*appsv1.StatefulSet) if !ok { - s.logger.Fatalw("expected a Statefulset object", zap.Any("object", obj)) + logging.FromContext(ctx).Warnw("expected a Statefulset object", zap.Any("object", obj)) + return } s.lock.Lock() @@ -789,7 +802,7 @@ func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { s.replicas = 1 } else if s.replicas != *statefulset.Spec.Replicas { s.replicas = *statefulset.Spec.Replicas - s.logger.Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) + logging.FromContext(ctx).Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) } } diff --git a/vendor/knative.dev/eventing/test/upgrade/prober/forwarder.go b/vendor/knative.dev/eventing/test/upgrade/prober/forwarder.go index a7ee1d1ab6..1e96d7168f 100644 --- a/vendor/knative.dev/eventing/test/upgrade/prober/forwarder.go +++ b/vendor/knative.dev/eventing/test/upgrade/prober/forwarder.go @@ -20,6 +20,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/duck" "knative.dev/eventing/test/lib/resources" @@ -69,6 +70,9 @@ func (p *prober) forwarderKService(name, namespace string) *unstructured.Unstruc "spec": map[string]interface{}{ "template": map[string]interface{}{ "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + "sidecar.istio.io/inject": "true", + }, "annotations": map[string]interface{}{ "sidecar.istio.io/inject": "true", "sidecar.istio.io/rewriteAppHTTPProbers": "true", diff --git a/vendor/knative.dev/eventing/test/upgrade/prober/receiver.go b/vendor/knative.dev/eventing/test/upgrade/prober/receiver.go index ddd75fd664..47c574e2d8 100644 --- a/vendor/knative.dev/eventing/test/upgrade/prober/receiver.go +++ b/vendor/knative.dev/eventing/test/upgrade/prober/receiver.go @@ -91,7 +91,8 @@ func (p *prober) createReceiverDeployment() *appsv1.Deployment { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": receiver.Name, + "app": receiver.Name, + "sidecar.istio.io/inject": "true", }, Annotations: map[string]string{ "sidecar.istio.io/inject": "true", diff --git a/vendor/knative.dev/eventing/test/upgrade/prober/sender.go b/vendor/knative.dev/eventing/test/upgrade/prober/sender.go index df9173a1f0..4e1a26bc2c 100644 --- a/vendor/knative.dev/eventing/test/upgrade/prober/sender.go +++ b/vendor/knative.dev/eventing/test/upgrade/prober/sender.go @@ -52,7 +52,8 @@ func (p *prober) deploySender() { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": sender.Name, + "app": sender.Name, + "sidecar.istio.io/inject": "true", }, Annotations: map[string]string{ "sidecar.istio.io/inject": "true", diff --git a/vendor/knative.dev/eventing/test/upgrade/prober/verify.go b/vendor/knative.dev/eventing/test/upgrade/prober/verify.go index 9810daf8cc..069e3a1821 100644 --- a/vendor/knative.dev/eventing/test/upgrade/prober/verify.go +++ b/vendor/knative.dev/eventing/test/upgrade/prober/verify.go @@ -255,7 +255,8 @@ func (p *prober) deployFetcher() *batchv1.Job { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": fetcherName, + "app": fetcherName, + "sidecar.istio.io/inject": "true", }, Annotations: map[string]string{ "sidecar.istio.io/inject": "true", diff --git a/vendor/knative.dev/pkg/test/crd.go b/vendor/knative.dev/pkg/test/crd.go index eb7c0e0b3d..941ec7c21e 100644 --- a/vendor/knative.dev/pkg/test/crd.go +++ b/vendor/knative.dev/pkg/test/crd.go @@ -74,9 +74,9 @@ func CoreV1ObjectReference(kind, apiversion, name string) *corev1.ObjectReferenc func NginxPod(namespace string) *corev1.Pod { return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: nginxName, - Namespace: namespace, - Annotations: map[string]string{"sidecar.istio.io/inject": "true"}, + Name: nginxName, + Namespace: namespace, + Labels: map[string]string{"sidecar.istio.io/inject": "true"}, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ diff --git a/vendor/knative.dev/pkg/webhook/resourcesemantics/defaulting/controller.go b/vendor/knative.dev/pkg/webhook/resourcesemantics/defaulting/controller.go index f318d02316..ed55fd4708 100644 --- a/vendor/knative.dev/pkg/webhook/resourcesemantics/defaulting/controller.go +++ b/vendor/knative.dev/pkg/webhook/resourcesemantics/defaulting/controller.go @@ -100,9 +100,10 @@ func newController(ctx context.Context, name string, optsFunc ...OptionFunc) *co handlers: opts.types, callbacks: opts.callbacks, - withContext: opts.wc, - disallowUnknownFields: opts.disallowUnknownFields, - secretName: wopts.SecretName, + withContext: opts.wc, + disallowUnknownFields: opts.disallowUnknownFields, + secretName: wopts.SecretName, + disableNamespaceOwnership: wopts.DisableNamespaceOwnership, client: client, mwhlister: mwhInformer.Lister(), diff --git a/vendor/knative.dev/pkg/webhook/resourcesemantics/defaulting/defaulting.go b/vendor/knative.dev/pkg/webhook/resourcesemantics/defaulting/defaulting.go index 4140ec7192..6aa08b4b94 100644 --- a/vendor/knative.dev/pkg/webhook/resourcesemantics/defaulting/defaulting.go +++ b/vendor/knative.dev/pkg/webhook/resourcesemantics/defaulting/defaulting.go @@ -69,8 +69,9 @@ type reconciler struct { mwhlister admissionlisters.MutatingWebhookConfigurationLister secretlister corelisters.SecretLister - disallowUnknownFields bool - secretName string + disallowUnknownFields bool + secretName string + disableNamespaceOwnership bool } // CallbackFunc is the function to be invoked. @@ -218,12 +219,14 @@ func (ac *reconciler) reconcileMutatingWebhook(ctx context.Context, caCert []byt current := configuredWebhook.DeepCopy() - ns, err := ac.client.CoreV1().Namespaces().Get(ctx, system.Namespace(), metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to fetch namespace: %w", err) + if !ac.disableNamespaceOwnership { + ns, err := ac.client.CoreV1().Namespaces().Get(ctx, system.Namespace(), metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch namespace: %w", err) + } + nsRef := *metav1.NewControllerRef(ns, corev1.SchemeGroupVersion.WithKind("Namespace")) + current.OwnerReferences = []metav1.OwnerReference{nsRef} } - nsRef := *metav1.NewControllerRef(ns, corev1.SchemeGroupVersion.WithKind("Namespace")) - current.OwnerReferences = []metav1.OwnerReference{nsRef} for i, wh := range current.Webhooks { if wh.Name != current.Name { diff --git a/vendor/knative.dev/pkg/webhook/resourcesemantics/validation/controller.go b/vendor/knative.dev/pkg/webhook/resourcesemantics/validation/controller.go index f24b36792a..c8afa5c138 100644 --- a/vendor/knative.dev/pkg/webhook/resourcesemantics/validation/controller.go +++ b/vendor/knative.dev/pkg/webhook/resourcesemantics/validation/controller.go @@ -86,9 +86,10 @@ func newController(ctx context.Context, name string, optsFunc ...OptionFunc) *co handlers: opts.types, callbacks: opts.callbacks, - withContext: opts.wc, - disallowUnknownFields: opts.DisallowUnknownFields(), - secretName: woptions.SecretName, + withContext: opts.wc, + disallowUnknownFields: opts.DisallowUnknownFields(), + secretName: woptions.SecretName, + disableNamespaceOwnership: woptions.DisableNamespaceOwnership, client: client, vwhlister: vwhInformer.Lister(), diff --git a/vendor/knative.dev/pkg/webhook/resourcesemantics/validation/reconcile_config.go b/vendor/knative.dev/pkg/webhook/resourcesemantics/validation/reconcile_config.go index afbc45c051..9f3114d4c6 100644 --- a/vendor/knative.dev/pkg/webhook/resourcesemantics/validation/reconcile_config.go +++ b/vendor/knative.dev/pkg/webhook/resourcesemantics/validation/reconcile_config.go @@ -60,8 +60,9 @@ type reconciler struct { vwhlister admissionlisters.ValidatingWebhookConfigurationLister secretlister corelisters.SecretLister - disallowUnknownFields bool - secretName string + disallowUnknownFields bool + secretName string + disableNamespaceOwnership bool } var ( @@ -193,13 +194,15 @@ func (ac *reconciler) reconcileValidatingWebhook(ctx context.Context, caCert []b current := configuredWebhook.DeepCopy() - // Set the owner to namespace. - ns, err := ac.client.CoreV1().Namespaces().Get(ctx, system.Namespace(), metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to fetch namespace: %w", err) + if !ac.disableNamespaceOwnership { + // Set the owner to namespace. + ns, err := ac.client.CoreV1().Namespaces().Get(ctx, system.Namespace(), metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch namespace: %w", err) + } + nsRef := *metav1.NewControllerRef(ns, corev1.SchemeGroupVersion.WithKind("Namespace")) + current.OwnerReferences = []metav1.OwnerReference{nsRef} } - nsRef := *metav1.NewControllerRef(ns, corev1.SchemeGroupVersion.WithKind("Namespace")) - current.OwnerReferences = []metav1.OwnerReference{nsRef} for i, wh := range current.Webhooks { if wh.Name != current.Name { diff --git a/vendor/knative.dev/pkg/webhook/webhook.go b/vendor/knative.dev/pkg/webhook/webhook.go index e05c6f041e..1b90e75fca 100644 --- a/vendor/knative.dev/pkg/webhook/webhook.go +++ b/vendor/knative.dev/pkg/webhook/webhook.go @@ -81,6 +81,10 @@ type Options struct { // before shutting down. GracePeriod time.Duration + // DisableNamespaceOwnership configures whether the webhook adds an owner reference for the SYSTEM_NAMESPACE + // Disabling this is useful when you expect the webhook configuration to be managed by something other than knative + DisableNamespaceOwnership bool + // ControllerOptions encapsulates options for creating a new controller, // including throttling and stats behavior. ControllerOptions *controller.ControllerOptions diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml b/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml index 83b2c55fba..c780e86ab6 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml @@ -19,6 +19,9 @@ metadata: namespace: {{ .namespace }} labels: app: eventshub-{{ .name }} + {{ range $key, $value := .labels }} + {{ $key }}: "{{ $value }}" + {{ end }} {{ if .annotations }} annotations: {{ range $key, $value := .annotations }} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/104-forwarder.yaml b/vendor/knative.dev/reconciler-test/pkg/eventshub/104-forwarder.yaml index e014d5fb9b..a34e713249 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/104-forwarder.yaml +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/104-forwarder.yaml @@ -25,12 +25,20 @@ metadata: {{ end }} spec: template: - {{ if .podannotations }} + {{ if or .podannotations .podlabels }} metadata: + {{ if .podannotations }} annotations: {{ range $key, $value := .podannotations }} {{ $key }}: "{{ $value }}" - {{ end }} + {{ end }} + {{ end }} + {{ if .podlabels }} + labels: + {{ range $key, $value := .podlabels }} + {{ $key }}: "{{ $value }}" + {{ end }} + {{ end }} {{ end }} spec: serviceAccountName: "{{ .name }}" diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go index d87e2f5ffb..c34f55c8f1 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go @@ -168,6 +168,7 @@ func Install(name string, options ...EventsHubOption) feature.StepFn { if ic := environment.GetIstioConfig(ctx); ic.Enabled { manifest.WithIstioPodAnnotations(cfg) + manifest.WithIstioPodLabels(cfg) } manifest.PodSecurityCfgFn(ctx, t)(cfg) diff --git a/vendor/knative.dev/reconciler-test/pkg/manifest/manifest.go b/vendor/knative.dev/reconciler-test/pkg/manifest/manifest.go index 2fc8f700e9..8276d51c8b 100644 --- a/vendor/knative.dev/reconciler-test/pkg/manifest/manifest.go +++ b/vendor/knative.dev/reconciler-test/pkg/manifest/manifest.go @@ -86,10 +86,14 @@ func (f *YamlManifest) Apply(spec *unstructured.Unstructured) error { if err != nil { return err } + gvr, _ := meta.UnsafeGuessKindToResource(spec.GroupVersionKind()) if current == nil { f.log.Info("Creating type ", spec.GroupVersionKind(), " name ", spec.GetName()) - gvr, _ := meta.UnsafeGuessKindToResource(spec.GroupVersionKind()) if _, err := f.client.Resource(gvr).Namespace(spec.GetNamespace()).Create(context.Background(), spec, v1.CreateOptions{}); err != nil { + // We might be applying the same resource in parallel, in that case, update the resource. + if errors.IsAlreadyExists(err) { + return f.Apply(spec) + } return fmt.Errorf("failed to create resource %v - Resource:\n%s", err, toYaml(spec)) } } else { @@ -97,7 +101,6 @@ func (f *YamlManifest) Apply(spec *unstructured.Unstructured) error { if UpdateChanged(spec.UnstructuredContent(), current.UnstructuredContent()) { f.log.Info("Updating type ", spec.GroupVersionKind(), " name ", spec.GetName()) - gvr, _ := meta.UnsafeGuessKindToResource(spec.GroupVersionKind()) if _, err = f.client.Resource(gvr).Namespace(current.GetNamespace()).Update(context.Background(), current, v1.UpdateOptions{}); err != nil { return fmt.Errorf("failed to update resource %v - Resource:\n%s", err, toYaml(spec)) } diff --git a/vendor/knative.dev/reconciler-test/pkg/manifest/options.go b/vendor/knative.dev/reconciler-test/pkg/manifest/options.go index bb51fb58a3..6e0aa92b9c 100644 --- a/vendor/knative.dev/reconciler-test/pkg/manifest/options.go +++ b/vendor/knative.dev/reconciler-test/pkg/manifest/options.go @@ -63,6 +63,21 @@ func WithPodAnnotations(additional map[string]interface{}) CfgFn { } } +// WithPodLabels appends pod labels (usually used by types where pod template is embedded) +func WithPodLabels(additional map[string]string) CfgFn { + return func(cfg map[string]interface{}) { + if ann, ok := cfg["podlabels"]; ok { + m := make(map[string]interface{}, len(additional)) + for k, v := range additional { + m[k] = v + } + appendToOriginal(ann, m) + return + } + cfg["podlabels"] = additional + } +} + func appendToOriginal(original interface{}, additional map[string]interface{}) { annotations := original.(map[string]interface{}) for k, v := range additional { @@ -92,3 +107,12 @@ func WithIstioPodAnnotations(cfg map[string]interface{}) { WithAnnotations(podAnnotations)(cfg) WithPodAnnotations(podAnnotations)(cfg) } + +func WithIstioPodLabels(cfg map[string]interface{}) { + podLabels := map[string]string{ + "sidecar.istio.io/inject": "true", + } + + WithLabels(podLabels)(cfg) + WithPodLabels(podLabels)(cfg) +} diff --git a/vendor/knative.dev/reconciler-test/pkg/resources/job/job.go b/vendor/knative.dev/reconciler-test/pkg/resources/job/job.go index b9237df3dd..aee655360d 100644 --- a/vendor/knative.dev/reconciler-test/pkg/resources/job/job.go +++ b/vendor/knative.dev/reconciler-test/pkg/resources/job/job.go @@ -47,6 +47,7 @@ func Install(name string, image string, options ...manifest.CfgFn) feature.StepF if ic := environment.GetIstioConfig(ctx); ic.Enabled { manifest.WithIstioPodAnnotations(cfg) + manifest.WithIstioPodLabels(cfg) } manifest.PodSecurityCfgFn(ctx, t)(cfg) diff --git a/vendor/modules.txt b/vendor/modules.txt index 186324015f..c365607205 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -129,7 +129,7 @@ github.com/go-openapi/jsonreference/internal # github.com/go-openapi/swag v0.23.0 ## explicit; go 1.20 github.com/go-openapi/swag -# github.com/gobuffalo/flect v1.0.2 +# github.com/gobuffalo/flect v1.0.3 ## explicit; go 1.16 github.com/gobuffalo/flect # github.com/gogo/protobuf v1.3.2 @@ -420,8 +420,8 @@ go.opentelemetry.io/otel/trace/embedded # go.uber.org/atomic v1.10.0 ## explicit; go 1.18 go.uber.org/atomic -# go.uber.org/automaxprocs v1.5.3 -## explicit; go 1.18 +# go.uber.org/automaxprocs v1.6.0 +## explicit; go 1.20 go.uber.org/automaxprocs/internal/cgroups go.uber.org/automaxprocs/internal/runtime go.uber.org/automaxprocs/maxprocs @@ -514,13 +514,13 @@ gomodules.xyz/jsonpatch/v2 # google.golang.org/api v0.183.0 ## explicit; go 1.20 google.golang.org/api/support/bundler -# google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a -## explicit; go 1.20 +# google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 +## explicit; go 1.21 google.golang.org/genproto/googleapis/api/httpbody -# google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a -## explicit; go 1.20 +# google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 +## explicit; go 1.21 google.golang.org/genproto/googleapis/rpc/status -# google.golang.org/grpc v1.66.2 +# google.golang.org/grpc v1.67.0 ## explicit; go 1.21 google.golang.org/grpc google.golang.org/grpc/attributes @@ -1160,7 +1160,7 @@ k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/strings/slices k8s.io/utils/trace -# knative.dev/eventing v0.42.1-0.20240918141338-17088813b4e0 +# knative.dev/eventing v0.42.1-0.20240926123447-e7fca7646f4a ## explicit; go 1.22.0 knative.dev/eventing/cmd/event_display knative.dev/eventing/cmd/heartbeats @@ -1349,7 +1349,7 @@ knative.dev/eventing/test/upgrade/prober/wathola/sender # knative.dev/hack v0.0.0-20240909014011-fc6a8452af6d ## explicit; go 1.21 knative.dev/hack -# knative.dev/pkg v0.0.0-20240917091217-aaab500c26c4 +# knative.dev/pkg v0.0.0-20240930065954-503173341499 ## explicit; go 1.22.0 knative.dev/pkg/apiextensions/storageversion knative.dev/pkg/apiextensions/storageversion/cmd/migrate @@ -1466,7 +1466,7 @@ knative.dev/pkg/webhook/json knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20240919063827-0cb8938be2e4 +# knative.dev/reconciler-test v0.0.0-20240926123451-87d857060042 ## explicit; go 1.22.0 knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment