From cfad5be7a28786de4af1150bffbdf2060cfad340 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Fri, 6 Nov 2020 13:09:07 +0100 Subject: [PATCH] [0.17] Update eventing to latest (#1657) Signed-off-by: Pierangelo Di Pilato --- go.mod | 2 +- go.sum | 4 +- .../pkg/apis/eventing/v1/trigger_lifecycle.go | 2 +- .../eventing/pkg/kncloudevents/http_client.go | 106 +++++++ .../pkg/kncloudevents/message_sender.go | 52 +--- .../broker_control_plane_test_helper.go | 4 +- .../helpers/broker_data_plane_test_helper.go | 12 +- .../helpers/broker_tracing_test_helper.go | 18 +- .../channel_status_subscriber_test_helper.go | 4 +- .../helpers/channel_tracing_test_helper.go | 16 +- .../helpers/tracing_test_helper.go | 4 +- .../e2e/helpers/broker_channel_flow_helper.go | 12 +- ...broker_event_transformation_test_helper.go | 12 +- ...channel_event_tranformation_test_helper.go | 12 +- .../test/e2e/helpers/parallel_test_helper.go | 28 +- .../test/e2e/helpers/sequence_test_helper.go | 24 +- .../helpers/trigger_no_broker_test_helper.go | 5 +- .../knative.dev/eventing/test/lib/client.go | 6 + .../eventing/test/lib/dropevents/receiver.go | 14 + .../eventing/test/lib/k8s_events.go | 84 ++++++ .../test/lib/recordevents/event_info.go | 262 +++--------------- .../test/lib/recordevents/event_info_store.go | 208 ++++---------- .../test/lib/recordevents/event_log.go | 33 +++ .../lib/recordevents/logger_vent/logger.go | 27 ++ .../lib/recordevents/observer/observer.go | 193 +++++++++++++ .../test/lib/recordevents/observer/reply.go | 90 ++++++ .../recordevents/recorder_vent/constructor.go | 78 ++++++ .../lib/recordevents/recorder_vent/doc.go | 19 ++ .../recordevents/recorder_vent/recorder.go | 120 ++++++++ .../test/lib/recordevents/resources.go | 152 ++++++++++ .../eventing/test/lib/resources/kube.go | 98 ------- .../eventing/test/lib/test_runner.go | 44 ++- .../test_images/recordevents/eventstore.go | 191 ------------- .../test/test_images/recordevents/main.go | 162 ++--------- vendor/modules.txt | 5 +- 35 files changed, 1190 insertions(+), 913 deletions(-) create mode 100644 vendor/knative.dev/eventing/pkg/kncloudevents/http_client.go create mode 100644 vendor/knative.dev/eventing/test/lib/k8s_events.go create mode 100644 vendor/knative.dev/eventing/test/lib/recordevents/event_log.go create mode 100644 vendor/knative.dev/eventing/test/lib/recordevents/logger_vent/logger.go create mode 100644 vendor/knative.dev/eventing/test/lib/recordevents/observer/observer.go create mode 100644 vendor/knative.dev/eventing/test/lib/recordevents/observer/reply.go create mode 100644 vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/constructor.go create mode 100644 vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/doc.go create mode 100644 vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/recorder.go create mode 100644 vendor/knative.dev/eventing/test/lib/recordevents/resources.go delete mode 100644 vendor/knative.dev/eventing/test/test_images/recordevents/eventstore.go diff --git a/go.mod b/go.mod index c870814cb4..5b7f7ff137 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( k8s.io/apimachinery v0.19.0 k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible k8s.io/utils v0.0.0-20200603063816-c1c6865ac451 - knative.dev/eventing v0.17.5-0.20200925065343-049b8e743bd4 + knative.dev/eventing v0.17.9-0.20201105153307-2fb113c42ff4 knative.dev/pkg v0.0.0-20200824160247-5343c1d19369 knative.dev/serving v0.17.1 knative.dev/test-infra v0.0.0-20200828171708-f68cb78c80a9 diff --git a/go.sum b/go.sum index 3420b36a8b..323735e84e 100644 --- a/go.sum +++ b/go.sum @@ -2028,8 +2028,8 @@ k8s.io/utils v0.0.0-20200603063816-c1c6865ac451/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ knative.dev/caching v0.0.0-20190719140829-2032732871ff/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg= knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg= knative.dev/caching v0.0.0-20200811171106-48c335fed9c8/go.mod h1:XonDcFC2DLSWP71f2y7oYnXUko5d5HsJRnZtkp0wY7g= -knative.dev/eventing v0.17.5-0.20200925065343-049b8e743bd4 h1:Wygx5VC4nZDs9p1Om8EYWqHMSNDZ6gWhio09hj0KVMw= -knative.dev/eventing v0.17.5-0.20200925065343-049b8e743bd4/go.mod h1:9NwCSwLnMCKmgz3YQBNax18mSgBjud8CvfsUUVOZ1sA= +knative.dev/eventing v0.17.9-0.20201105153307-2fb113c42ff4 h1:g6ud+UJbnjht9uciWVf29aUFAI3IKn2PfyTQpdXkD3Y= +knative.dev/eventing v0.17.9-0.20201105153307-2fb113c42ff4/go.mod h1:9NwCSwLnMCKmgz3YQBNax18mSgBjud8CvfsUUVOZ1sA= knative.dev/eventing-contrib v0.6.1-0.20190723221543-5ce18048c08b/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g= knative.dev/eventing-contrib v0.11.2/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g= knative.dev/networking v0.0.0-20200812200006-4d518e76538a h1:E1rnQR9IZvDcEAgoOXMW9LWqevaYFVTlMS2ndgoAO6Y= diff --git a/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go b/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go index 2ad85b219f..aba190f023 100644 --- a/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go +++ b/vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go @@ -167,7 +167,7 @@ func (ts *TriggerStatus) MarkDependencyNotConfigured() { "DependencyNotConfigured", "Dependency has not yet been reconciled.") } -func (ts *TriggerStatus) PropagateDependencyStatus(ks *duckv1.KResource) { +func (ts *TriggerStatus) PropagateDependencyStatus(ks *duckv1.Source) { kc := ks.Status.GetCondition(apis.ConditionReady) if kc == nil { ts.MarkDependencyNotConfigured() diff --git a/vendor/knative.dev/eventing/pkg/kncloudevents/http_client.go b/vendor/knative.dev/eventing/pkg/kncloudevents/http_client.go new file mode 100644 index 0000000000..d9d938f737 --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/kncloudevents/http_client.go @@ -0,0 +1,106 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kncloudevents + +import ( + nethttp "net/http" + "sync" + "time" + + "go.opencensus.io/plugin/ochttp" + "knative.dev/pkg/tracing/propagation/tracecontextb3" +) + +const ( + defaultRetryWaitMin = 1 * time.Second + defaultRetryWaitMax = 30 * time.Second +) + +type holder struct { + clientMutex sync.Mutex + connectionArgs *ConnectionArgs + client **nethttp.Client +} + +var clientHolder = holder{} + +// The used HTTP client is a singleton, so the same http client is reused across all the application. +// If connection args is modified, client is cleaned and a new one is created. +func getClient() *nethttp.Client { + clientHolder.clientMutex.Lock() + defer clientHolder.clientMutex.Unlock() + + if clientHolder.client == nil { + // Add connection options to the default transport. + var base = nethttp.DefaultTransport.(*nethttp.Transport).Clone() + clientHolder.connectionArgs.configureTransport(base) + c := &nethttp.Client{ + // Add output tracing. + Transport: &ochttp.Transport{ + Base: base, + Propagation: tracecontextb3.TraceContextEgress, + }, + } + clientHolder.client = &c + } + + return *clientHolder.client +} + +// ConfigureConnectionArgs configures the new connection args. +// The existing client won't be affected, but a new one will be created. +// Use sparingly, because it might lead to creating a lot of clients, none of them sharing their connection pool! +func ConfigureConnectionArgs(ca *ConnectionArgs) { + clientHolder.clientMutex.Lock() + defer clientHolder.clientMutex.Unlock() + + // Check if same config + if clientHolder.connectionArgs != nil && + ca != nil && + ca.MaxIdleConns == clientHolder.connectionArgs.MaxIdleConns && + ca.MaxIdleConnsPerHost == clientHolder.connectionArgs.MaxIdleConnsPerHost { + return + } + + if clientHolder.client != nil { + // Let's try to clean up a bit the existing client + // Note: this won't remove it nor close it + (*clientHolder.client).CloseIdleConnections() + + // Setting client to nil + clientHolder.client = nil + } + + clientHolder.connectionArgs = ca +} + +// ConnectionArgs allow to configure connection parameters to the underlying +// HTTP Client transport. +type ConnectionArgs struct { + // MaxIdleConns refers to the max idle connections, as in net/http/transport. + MaxIdleConns int + // MaxIdleConnsPerHost refers to the max idle connections per host, as in net/http/transport. + MaxIdleConnsPerHost int +} + +func (ca *ConnectionArgs) configureTransport(transport *nethttp.Transport) { + if ca == nil { + return + } + transport.MaxIdleConns = ca.MaxIdleConns + transport.MaxIdleConnsPerHost = ca.MaxIdleConnsPerHost +} diff --git a/vendor/knative.dev/eventing/pkg/kncloudevents/message_sender.go b/vendor/knative.dev/eventing/pkg/kncloudevents/message_sender.go index 1a425cd601..ae6d5f54c0 100644 --- a/vendor/knative.dev/eventing/pkg/kncloudevents/message_sender.go +++ b/vendor/knative.dev/eventing/pkg/kncloudevents/message_sender.go @@ -25,17 +25,10 @@ import ( "github.com/hashicorp/go-retryablehttp" "github.com/rickb777/date/period" - "go.opencensus.io/plugin/ochttp" - "knative.dev/pkg/tracing/propagation/tracecontextb3" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" ) -const ( - defaultRetryWaitMin = 1 * time.Second - defaultRetryWaitMax = 30 * time.Second -) - var noRetries = RetryConfig{ RetryMax: 0, CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { @@ -46,41 +39,15 @@ var noRetries = RetryConfig{ }, } -// ConnectionArgs allow to configure connection parameters to the underlying -// HTTP Client transport. -type ConnectionArgs struct { - // MaxIdleConns refers to the max idle connections, as in net/http/transport. - MaxIdleConns int - // MaxIdleConnsPerHost refers to the max idle connections per host, as in net/http/transport. - MaxIdleConnsPerHost int -} - -func (ca *ConnectionArgs) ConfigureTransport(transport *nethttp.Transport) { - if ca == nil { - return - } - transport.MaxIdleConns = ca.MaxIdleConns - transport.MaxIdleConnsPerHost = ca.MaxIdleConnsPerHost -} - type HttpMessageSender struct { Client *nethttp.Client Target string } -func NewHttpMessageSender(connectionArgs *ConnectionArgs, target string) (*HttpMessageSender, error) { - // Add connection options to the default transport. - var base = nethttp.DefaultTransport.(*nethttp.Transport).Clone() - connectionArgs.ConfigureTransport(base) - // Add output tracing. - client := &nethttp.Client{ - Transport: &ochttp.Transport{ - Base: base, - Propagation: tracecontextb3.TraceContextEgress, - }, - } - - return &HttpMessageSender{Client: client, Target: target}, nil +// Deprecated: Don't use this anymore, now it has the same effect of NewHTTPMessageSenderWithTarget +// If you need to modify the connection args, use ConfigureConnectionArgs sparingly. +func NewHttpMessageSender(ca *ConnectionArgs, target string) (*HttpMessageSender, error) { + return &HttpMessageSender{Client: getClient(), Target: target}, nil } func (s *HttpMessageSender) NewCloudEventRequest(ctx context.Context) (*nethttp.Request, error) { @@ -139,7 +106,12 @@ func (s *HttpMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC }, } - return retryableClient.Do(&retryablehttp.Request{Request: req}) + retryableReq, err := retryablehttp.FromRequest(req) + if err != nil { + return nil, err + } + + return retryableClient.Do(retryableReq) } func NoRetries() RetryConfig { @@ -179,6 +151,6 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) return retryConfig, nil } -func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) { - return resp != nil && resp.StatusCode >= 300, nil +func checkRetry(_ context.Context, resp *nethttp.Response, err error) (bool, error) { + return !(resp != nil && resp.StatusCode < 300), err } diff --git a/vendor/knative.dev/eventing/test/conformance/helpers/broker_control_plane_test_helper.go b/vendor/knative.dev/eventing/test/conformance/helpers/broker_control_plane_test_helper.go index c9a975fded..87f58ac151 100644 --- a/vendor/knative.dev/eventing/test/conformance/helpers/broker_control_plane_test_helper.go +++ b/vendor/knative.dev/eventing/test/conformance/helpers/broker_control_plane_test_helper.go @@ -24,6 +24,7 @@ import ( "knative.dev/pkg/reconciler" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + "knative.dev/eventing/test/lib/recordevents" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/duck" @@ -81,8 +82,7 @@ func triggerV1Beta1BeforeBrokerHelper(triggerName string, client *testlib.Client const etLogger = "logger" const loggerPodName = "logger-pod" - logPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName)) + _ = recordevents.DeployEventRecordOrFail(client, loggerPodName) client.WaitForAllTestResourcesReadyOrFail() // Can't do this for the trigger because it's not 'ready' yet client.CreateTriggerOrFailV1Beta1(triggerName, resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, etLogger, map[string]interface{}{}), diff --git a/vendor/knative.dev/eventing/test/conformance/helpers/broker_data_plane_test_helper.go b/vendor/knative.dev/eventing/test/conformance/helpers/broker_data_plane_test_helper.go index 664342c8ee..29db37c611 100644 --- a/vendor/knative.dev/eventing/test/conformance/helpers/broker_data_plane_test_helper.go +++ b/vendor/knative.dev/eventing/test/conformance/helpers/broker_data_plane_test_helper.go @@ -316,13 +316,15 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper( source := "origin-for-reply" event.SetSource(source) msg := []byte(`{"msg":"Transformed!"}`) - transformPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + client, "transformer-pod", - "reply-check-type", - "reply-check-source", - msg, + recordevents.ReplyWithTransformedEvent( + "reply-check-type", + "reply-check-source", + string(msg), + ), ) - client.CreatePodOrFail(transformPod, testlib.WithService("transformer-pod")) client.WaitForServiceEndpointsOrFail("transformer-pod", 1) transformTrigger := client.CreateTriggerOrFailV1Beta1( "transform-trigger", diff --git a/vendor/knative.dev/eventing/test/conformance/helpers/broker_tracing_test_helper.go b/vendor/knative.dev/eventing/test/conformance/helpers/broker_tracing_test_helper.go index 597a2c6884..1d3f9f63e5 100644 --- a/vendor/knative.dev/eventing/test/conformance/helpers/broker_tracing_test_helper.go +++ b/vendor/knative.dev/eventing/test/conformance/helpers/broker_tracing_test_helper.go @@ -29,6 +29,7 @@ import ( "knative.dev/eventing/pkg/utils" tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" "knative.dev/eventing/test/lib/sender" ) @@ -78,8 +79,7 @@ func setupBrokerTracing(brokerClass string) SetupTracingTestInfrastructureFunc { ) // Create a logger (EventRecord) Pod and a K8s Service that points to it. - logPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName)) + _ = recordevents.DeployEventRecordOrFail(client, loggerPodName) // Create a Trigger that receives events (type=bar) and sends them to the logger Pod. loggerTrigger := client.CreateTriggerOrFailV1Beta1( @@ -89,15 +89,17 @@ func setupBrokerTracing(brokerClass string) SetupTracingTestInfrastructureFunc { resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerPodName), ) - // Create a transformer (EventTransfrmer) Pod that replies with the same event as the input, + // Create a transformer Pod (recordevents with transform reply) that replies with the same event as the input, // except the reply's event's type is changed to bar. - eventTransformerPod := resources.EventTransformationPod( + eventTransformerPod := recordevents.DeployEventRecordOrFail( + client, "transformer", - etLogger, - senderName, - []byte(eventBody), + recordevents.ReplyWithTransformedEvent( + etLogger, + senderName, + eventBody, + ), ) - client.CreatePodOrFail(eventTransformerPod, testlib.WithService(eventTransformerPod.Name)) // Create a Trigger that receives events (type=foo) and sends them to the transformer Pod. transformerTrigger := client.CreateTriggerOrFailV1Beta1( diff --git a/vendor/knative.dev/eventing/test/conformance/helpers/channel_status_subscriber_test_helper.go b/vendor/knative.dev/eventing/test/conformance/helpers/channel_status_subscriber_test_helper.go index 125661a16b..05c7e0f4ee 100644 --- a/vendor/knative.dev/eventing/test/conformance/helpers/channel_status_subscriber_test_helper.go +++ b/vendor/knative.dev/eventing/test/conformance/helpers/channel_status_subscriber_test_helper.go @@ -22,6 +22,7 @@ import ( duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" eventingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" corev1 "k8s.io/api/core/v1" @@ -56,8 +57,7 @@ func channelHasRequiredSubscriberStatus(st *testing.T, client *testlib.Client, c client.CreateChannelOrFail(channelName, &channel) client.WaitForResourceReadyOrFail(channelName, &channel) - pod := resources.EventRecordPod(subscriberServiceName + "-pod") - client.CreatePodOrFail(pod, testlib.WithService(subscriberServiceName)) + _ = recordevents.DeployEventRecordOrFail(client, subscriberServiceName+"-pod") subscription := client.CreateSubscriptionOrFail( subscriberServiceName, diff --git a/vendor/knative.dev/eventing/test/conformance/helpers/channel_tracing_test_helper.go b/vendor/knative.dev/eventing/test/conformance/helpers/channel_tracing_test_helper.go index 2447971b1c..f7c7058e42 100644 --- a/vendor/knative.dev/eventing/test/conformance/helpers/channel_tracing_test_helper.go +++ b/vendor/knative.dev/eventing/test/conformance/helpers/channel_tracing_test_helper.go @@ -28,6 +28,7 @@ import ( tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" "knative.dev/eventing/test/lib/sender" ) @@ -66,17 +67,18 @@ func setupChannelTracingWithReply( client.CreateChannelOrFail(replyChannelName, channel) // Create the 'sink', a LogEvents Pod and a K8s Service that points to it. - recordEventsPod := resources.EventRecordPod(recordEventsPodName) - client.CreatePodOrFail(recordEventsPod, testlib.WithService(recordEventsPodName)) + recordEventsPod := recordevents.DeployEventRecordOrFail(client, recordEventsPodName) // Create the subscriber, a Pod that mutates the event. - transformerPod := resources.EventTransformationPod( + transformerPod := recordevents.DeployEventRecordOrFail( + client, "transformer", - "mutated", - eventSource, - nil, + recordevents.ReplyWithTransformedEvent( + "mutated", + eventSource, + "", + ), ) - client.CreatePodOrFail(transformerPod, testlib.WithService(transformerPod.Name)) // Create the Subscription linking the Channel to the mutator. client.CreateSubscriptionOrFail( diff --git a/vendor/knative.dev/eventing/test/conformance/helpers/tracing_test_helper.go b/vendor/knative.dev/eventing/test/conformance/helpers/tracing_test_helper.go index 9f4d5772f0..d5d1a84ef4 100644 --- a/vendor/knative.dev/eventing/test/conformance/helpers/tracing_test_helper.go +++ b/vendor/knative.dev/eventing/test/conformance/helpers/tracing_test_helper.go @@ -65,7 +65,7 @@ func tracingTest( expectedTestSpan, eventMatcher := setupInfrastructure(t, &channel, client, recordEventsPodName, true) // Start the event info store and assert the event was received correctly - targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) + targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName, client.Namespace) if err != nil { t.Fatalf("Pod tracker failed: %v", err) } @@ -105,6 +105,6 @@ func getTraceIDHeader(t *testing.T, evInfos []recordevents.EventInfo) string { } } } - t.Fatalf("FAIL: No traceid in %d messages: (%s)", len(evInfos), evInfos) + t.Fatalf("FAIL: No traceid in %d messages: (%v)", len(evInfos), evInfos) return "" } diff --git a/vendor/knative.dev/eventing/test/e2e/helpers/broker_channel_flow_helper.go b/vendor/knative.dev/eventing/test/e2e/helpers/broker_channel_flow_helper.go index ce216c5ce7..2ee1dd9df8 100644 --- a/vendor/knative.dev/eventing/test/e2e/helpers/broker_channel_flow_helper.go +++ b/vendor/knative.dev/eventing/test/e2e/helpers/broker_channel_flow_helper.go @@ -104,13 +104,15 @@ func BrokerChannelFlowWithTransformation(t *testing.T, } // create the transformation service for trigger1 - transformationPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + client, transformationPodName, - transformedEventType, - transformedEventSource, - []byte(transformedBody), + recordevents.ReplyWithTransformedEvent( + transformedEventType, + transformedEventSource, + transformedBody, + ), ) - client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName)) // create trigger1 to receive the original event, and do event transformation if triggerVersion == "v1" { diff --git a/vendor/knative.dev/eventing/test/e2e/helpers/broker_event_transformation_test_helper.go b/vendor/knative.dev/eventing/test/e2e/helpers/broker_event_transformation_test_helper.go index dae341369b..6c73a639a3 100644 --- a/vendor/knative.dev/eventing/test/e2e/helpers/broker_event_transformation_test_helper.go +++ b/vendor/knative.dev/eventing/test/e2e/helpers/broker_event_transformation_test_helper.go @@ -70,13 +70,15 @@ func EventTransformationForTriggerTestHelper(t *testing.T, client.WaitForResourceReadyOrFail(brokerName, testlib.BrokerTypeMeta) // create the transformation service - transformationPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + client, transformationPodName, - transformedEventType, - transformedEventSource, - []byte(transformedBody), + recordevents.ReplyWithTransformedEvent( + transformedEventType, + transformedEventSource, + transformedBody, + ), ) - client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName)) // create trigger1 for event transformation if triggerVersion == "v1" { diff --git a/vendor/knative.dev/eventing/test/e2e/helpers/channel_event_tranformation_test_helper.go b/vendor/knative.dev/eventing/test/e2e/helpers/channel_event_tranformation_test_helper.go index 9b252ff4f3..2d80f981bb 100644 --- a/vendor/knative.dev/eventing/test/e2e/helpers/channel_event_tranformation_test_helper.go +++ b/vendor/knative.dev/eventing/test/e2e/helpers/channel_event_tranformation_test_helper.go @@ -62,13 +62,15 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T, if err := eventAfterTransformation.SetData(cloudevents.ApplicationJSON, []byte(transformedEventBody)); err != nil { t.Fatalf("Cannot set the payload of the event: %s", err.Error()) } - transformationPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + client, transformationPodName, - eventAfterTransformation.Type(), - eventAfterTransformation.Source(), - eventAfterTransformation.Data(), + recordevents.ReplyWithTransformedEvent( + eventAfterTransformation.Type(), + eventAfterTransformation.Source(), + string(eventAfterTransformation.Data()), + ), ) - client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName)) // create event logger pod and service as the subscriber eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName) diff --git a/vendor/knative.dev/eventing/test/e2e/helpers/parallel_test_helper.go b/vendor/knative.dev/eventing/test/e2e/helpers/parallel_test_helper.go index daf2edfda6..180cc90729 100644 --- a/vendor/knative.dev/eventing/test/e2e/helpers/parallel_test_helper.go +++ b/vendor/knative.dev/eventing/test/e2e/helpers/parallel_test_helper.go @@ -71,13 +71,19 @@ func ParallelTestHelper(t *testing.T, for branchNumber, cse := range tc.branchesConfig { // construct filter services filterPodName := fmt.Sprintf("parallel-%s-branch-%d-filter", tc.name, branchNumber) - filterPod := resources.EventFilteringPod(filterPodName, cse.filter) - client.CreatePodOrFail(filterPod, testlib.WithService(filterPodName)) + if cse.filter { + recordevents.DeployEventRecordOrFail(client, filterPodName) + } else { + recordevents.DeployEventRecordOrFail(client, filterPodName, recordevents.EchoEvent) + } // construct branch subscriber subPodName := fmt.Sprintf("parallel-%s-branch-%d-sub", tc.name, branchNumber) - subPod := resources.SequenceStepperPod(subPodName, subPodName) - client.CreatePodOrFail(subPod, testlib.WithService(subPodName)) + recordevents.DeployEventRecordOrFail( + client, + subPodName, + recordevents.ReplyWithAppendedData(subPodName), + ) parallelBranches[branchNumber] = v1beta1.ParallelBranch{ Filter: &duckv1.Destination{ @@ -177,13 +183,19 @@ func ParallelV1TestHelper(t *testing.T, for branchNumber, cse := range tc.branchesConfig { // construct filter services filterPodName := fmt.Sprintf("parallel-%s-branch-%d-filter", tc.name, branchNumber) - filterPod := resources.EventFilteringPod(filterPodName, cse.filter) - client.CreatePodOrFail(filterPod, testlib.WithService(filterPodName)) + if cse.filter { + recordevents.DeployEventRecordOrFail(client, filterPodName) + } else { + recordevents.DeployEventRecordOrFail(client, filterPodName, recordevents.EchoEvent) + } // construct branch subscriber subPodName := fmt.Sprintf("parallel-%s-branch-%d-sub", tc.name, branchNumber) - subPod := resources.SequenceStepperPod(subPodName, subPodName) - client.CreatePodOrFail(subPod, testlib.WithService(subPodName)) + recordevents.DeployEventRecordOrFail( + client, + subPodName, + recordevents.ReplyWithAppendedData(subPodName), + ) parallelBranches[branchNumber] = flowsv1.ParallelBranch{ Filter: &duckv1.Destination{ diff --git a/vendor/knative.dev/eventing/test/e2e/helpers/sequence_test_helper.go b/vendor/knative.dev/eventing/test/e2e/helpers/sequence_test_helper.go index 321957d96b..9f9b5e60b6 100644 --- a/vendor/knative.dev/eventing/test/e2e/helpers/sequence_test_helper.go +++ b/vendor/knative.dev/eventing/test/e2e/helpers/sequence_test_helper.go @@ -73,9 +73,11 @@ func SequenceTestHelper(t *testing.T, // create a stepper Pod with Service podName := config.podName msgAppender := config.msgAppender - stepperPod := resources.SequenceStepperPod(podName, msgAppender) + recordevents.DeployEventRecordOrFail( + client, podName, + recordevents.ReplyWithAppendedData(msgAppender), + ) - client.CreatePodOrFail(stepperPod, testlib.WithService(podName)) // create a new step step := v1beta1.SequenceStep{ Destination: duckv1.Destination{ @@ -128,8 +130,7 @@ func SequenceTestHelper(t *testing.T, event.SetSource(eventSource) event.SetType(testlib.DefaultEventType) msg := fmt.Sprintf("TestSequence %s", uuid.New().String()) - body := fmt.Sprintf(`{"msg":"%s"}`, msg) - if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + if err := event.SetData(cloudevents.TextPlain, msg); err != nil { st.Fatalf("Cannot set the payload of the event: %s", err.Error()) } client.SendEventToAddressable( @@ -145,7 +146,8 @@ func SequenceTestHelper(t *testing.T, } eventTracker.AssertAtLeast(1, recordevents.MatchEvent( cetest.HasSource(eventSource), - cetest.DataContains(expectedMsg), + cetest.HasDataContentType(cloudevents.TextPlain), + cetest.HasData([]byte(expectedMsg)), )) }) } @@ -186,9 +188,11 @@ func SequenceV1TestHelper(t *testing.T, // create a stepper Pod with Service podName := config.podName msgAppender := config.msgAppender - stepperPod := resources.SequenceStepperPod(podName, msgAppender) + recordevents.DeployEventRecordOrFail( + client, podName, + recordevents.ReplyWithAppendedData(msgAppender), + ) - client.CreatePodOrFail(stepperPod, testlib.WithService(podName)) // create a new step step := flowsv1.SequenceStep{ Destination: duckv1.Destination{ @@ -241,8 +245,7 @@ func SequenceV1TestHelper(t *testing.T, event.SetSource(eventSource) event.SetType(testlib.DefaultEventType) msg := fmt.Sprintf("TestSequence %s", uuid.New().String()) - body := fmt.Sprintf(`{"msg":"%s"}`, msg) - if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + if err := event.SetData(cloudevents.TextPlain, msg); err != nil { st.Fatalf("Cannot set the payload of the event: %s", err.Error()) } client.SendEventToAddressable( @@ -258,7 +261,8 @@ func SequenceV1TestHelper(t *testing.T, } eventTracker.AssertAtLeast(1, recordevents.MatchEvent( cetest.HasSource(eventSource), - cetest.DataContains(expectedMsg), + cetest.HasDataContentType(cloudevents.TextPlain), + cetest.HasData([]byte(expectedMsg)), )) }) } diff --git a/vendor/knative.dev/eventing/test/e2e/helpers/trigger_no_broker_test_helper.go b/vendor/knative.dev/eventing/test/e2e/helpers/trigger_no_broker_test_helper.go index 0f7d04f244..cd1e97160d 100644 --- a/vendor/knative.dev/eventing/test/e2e/helpers/trigger_no_broker_test_helper.go +++ b/vendor/knative.dev/eventing/test/e2e/helpers/trigger_no_broker_test_helper.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" ) @@ -40,8 +41,8 @@ func TestTriggerNoBroker(t *testing.T, channel string, brokerCreator BrokerCreat brokerName := strings.ToLower(channel) subscriberName := "dumper-empty" - eventRecordPod := resources.EventRecordPod(subscriberName) - client.CreatePodOrFail(eventRecordPod, testlib.WithService(subscriberName)) + recordevents.DeployEventRecordOrFail(client, subscriberName) + client.CreateTriggerOrFailV1Beta1("testtrigger", resources.WithSubscriberServiceRefForTriggerV1Beta1(subscriberName), resources.WithBrokerV1Beta1(brokerName), diff --git a/vendor/knative.dev/eventing/test/lib/client.go b/vendor/knative.dev/eventing/test/lib/client.go index b26e043473..f8868acdca 100644 --- a/vendor/knative.dev/eventing/test/lib/client.go +++ b/vendor/knative.dev/eventing/test/lib/client.go @@ -47,6 +47,8 @@ type Client struct { Dynamic dynamic.Interface Config *rest.Config + EventListener *EventListener + Namespace string T *testing.T Tracker *Tracker @@ -92,6 +94,10 @@ func NewClient(configPath string, clusterName string, namespace string, t *testi client.T = t client.Tracker = NewTracker(t, client.Dynamic) + // Start informer + client.EventListener = NewEventListener(client.Kube.Kube, client.Namespace, client.T.Logf) + client.Cleanup(client.EventListener.Stop) + client.tracingEnv, err = getTracingConfig(client.Kube.Kube) if err != nil { return nil, err diff --git a/vendor/knative.dev/eventing/test/lib/dropevents/receiver.go b/vendor/knative.dev/eventing/test/lib/dropevents/receiver.go index 4453b4f471..6b18f2be10 100644 --- a/vendor/knative.dev/eventing/test/lib/dropevents/receiver.go +++ b/vendor/knative.dev/eventing/test/lib/dropevents/receiver.go @@ -32,6 +32,20 @@ const ( NumberKey = "NUMBER" ) +// count is only used for SKIP_ALGORITHM=Sequence. +func SkipperAlgorithmWithCount(algorithm string, count uint64) Skipper { + switch algorithm { + case Fibonacci: + return &dropeventsfibonacci.Fibonacci{Prev: 1, Current: 1} + + case Sequence: + return dropeventsfirst.First{N: count} + + default: + panic("unknown algorithm: " + algorithm) + } +} + func SkipperAlgorithm(algorithm string) Skipper { switch algorithm { diff --git a/vendor/knative.dev/eventing/test/lib/k8s_events.go b/vendor/knative.dev/eventing/test/lib/k8s_events.go new file mode 100644 index 0000000000..1188d5c234 --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/k8s_events.go @@ -0,0 +1,84 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "context" + "sync" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +// EventHandler is the callback type for the EventListener +type EventHandler func(event *corev1.Event) + +// EventListener is a type that broadcasts new k8s events +type EventListener struct { + cancel context.CancelFunc + + lock sync.Mutex + handlers []EventHandler +} + +// NewEventListener creates a new event listener +func NewEventListener(client kubernetes.Interface, namespace string, logf func(string, ...interface{})) *EventListener { + ctx, cancelCtx := context.WithCancel(context.Background()) + informerFactory := informers.NewSharedInformerFactoryWithOptions( + client, + 0, + informers.WithNamespace(namespace), + ) + eventsInformer := informerFactory.Core().V1().Events().Informer() + + el := EventListener{ + cancel: cancelCtx, + } + + eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + el.handle(obj.(*corev1.Event)) + }, + }) + + go func() { + eventsInformer.Run(ctx.Done()) + logf("EventListener stopped") + }() + + return &el +} + +func (el *EventListener) handle(event *corev1.Event) { + el.lock.Lock() + defer el.lock.Unlock() + for _, handler := range el.handlers { + handler(event) + } +} + +func (el *EventListener) AddHandler(handler EventHandler) { + el.lock.Lock() + defer el.lock.Unlock() + el.handlers = append(el.handlers, handler) +} + +func (el *EventListener) Stop() { + el.cancel() +} diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/event_info.go b/vendor/knative.dev/eventing/test/lib/recordevents/event_info.go index e3fac76795..35a4d0265f 100644 --- a/vendor/knative.dev/eventing/test/lib/recordevents/event_info.go +++ b/vendor/knative.dev/eventing/test/lib/recordevents/event_info.go @@ -17,65 +17,61 @@ limitations under the License. package recordevents import ( - "encoding/json" "fmt" - "io/ioutil" - "math/rand" - "net/http" "strings" "time" cloudevents "github.com/cloudevents/sdk-go/v2" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "knative.dev/pkg/test/monitoring" - - "knative.dev/pkg/test/logging" - - testlib "knative.dev/eventing/test/lib" ) -// Port for the recordevents pod REST listener -const RecordEventsPort = 8392 - -// HTTP path for the GetMinMax REST call -const GetMinMaxPath = "/minmax" - -// HTTP path for the GetEntry REST call -const GetEntryPath = "/entry/" - -// HTTP path for the TrimThrough REST call -const TrimThroughPath = "/trimthrough/" - -// On-wire json rest api format for recordevents GetMinMax calls -// sennt to the recordevents pod. -type MinMaxResponse struct { - MinAvail int - MaxSeen int -} +const ( + // EventReason is the Kubernetes event reason used for observed events. + CloudEventObservedReason = "CloudEventObserved" +) // Structure to hold information about an event seen by recordevents pod. type EventInfo struct { // Set if the http request received by the pod couldn't be decoded or // didn't pass validation - Error string + Error string `json:"error,omitempty"` // Event received if the cloudevent received by the pod passed validation - Event *cloudevents.Event + Event *cloudevents.Event `json:"event,omitempty"` // HTTPHeaders of the connection that delivered the event - HTTPHeaders map[string][]string + HTTPHeaders map[string][]string `json:"httpHeaders,omitempty"` + Origin string `json:"origin,omitempty"` + Observer string `json:"observer,omitempty"` + Time time.Time `json:"time,omitempty"` + Sequence uint64 `json:"sequence"` + Dropped bool `json:"dropped"` } -// Pretty print the event. Meant for debugging. This formats the validation error -// or the full event as appropriate. This does NOT format the headers. +// Pretty print the event. Meant for debugging. func (ei *EventInfo) String() string { + var sb strings.Builder + sb.WriteString("-- EventInfo --\n") if ei.Event != nil { - return ei.Event.String() - } else { - return fmt.Sprintf("invalid event \"%s\"", ei.Error) + sb.WriteString("--- Event ---\n") + sb.WriteString(ei.Event.String()) + sb.WriteRune('\n') + sb.WriteRune('\n') + } + if ei.Error != "" { + sb.WriteString("--- Error ---\n") + sb.WriteString(ei.Error) + sb.WriteRune('\n') + sb.WriteRune('\n') } + sb.WriteString("--- HTTP headers ---\n") + for k, v := range ei.HTTPHeaders { + sb.WriteString(" " + k + ": " + v[0] + "\n") + } + sb.WriteRune('\n') + sb.WriteString("--- Origin: '" + ei.Origin + "' ---\n") + sb.WriteString("--- Observer: '" + ei.Observer + "' ---\n") + sb.WriteString("--- Time: " + ei.Time.String() + " ---\n") + sb.WriteString(fmt.Sprintf("--- Sequence: %d ---\n", ei.Sequence)) + sb.WriteString(fmt.Sprintf("--- Dropped: %v ---\n", ei.Dropped)) + return sb.String() } // This is mainly used for providing better failure messages @@ -94,189 +90,3 @@ func (s *SearchedInfo) String() string { } return sb.String() } - -// Connection state for a REST connection to a pod -type eventGetter struct { - podName string - podNamespace string - podPort int - kubeClientset kubernetes.Interface - logf logging.FormatLogger - - host string - port int - forwardPID int -} - -// Creates a forwarded port to the specified recordevents pod and waits until -// it can successfully talk to the REST API. Times out after timeoutEvRetry -func newEventGetter(podName string, client *testlib.Client, logf logging.FormatLogger) (eventGetterInterface, error) { - egi := &eventGetter{podName: podName, podNamespace: client.Namespace, - kubeClientset: client.Kube.Kube, podPort: RecordEventsPort, logf: logf} - err := egi.forwardPort() - if err != nil { - return nil, err - } - - err = egi.waitTillUp() - if err != nil { - return nil, err - } - return egi, nil -} - -// Get information about the provided podName. Uses list (rather than get) and -// returns a pod list for compatibility with the monitoring.PortForward -// interface -func (eg *eventGetter) getRunningPodInfo(podName, namespace string) (*v1.PodList, error) { - pods, err := eg.kubeClientset.CoreV1().Pods(namespace).List( - metav1.ListOptions{FieldSelector: fmt.Sprintf("metadata.name=%s", podName)}) - if err == nil && len(pods.Items) != 1 { - err = fmt.Errorf("no %s Pod found on the cluster", podName) - } else if pods.Items[0].Status.Phase != corev1.PodRunning { - err = fmt.Errorf("pod %s in state %s, wanted Running", podName, - pods.Items[0].Status.Phase) - } - - return pods, err -} - -// Try to forward the pod port to a local port somewhere in the range 30000-60000. -// keeps retrying with random ports in that range, timing out after timeoutEvRetry -func (eg *eventGetter) forwardPort() error { - portRand := rand.New(rand.NewSource(time.Now().UnixNano())) - portMin := 30000 - portMax := 60000 - var internalErr error - - wait.PollImmediate(minEvRetryInterval, timeoutEvRetry, func() (bool, error) { - localPort := portMin + portRand.Intn(portMax-portMin) - if err := monitoring.CheckPortAvailability(localPort); err != nil { - internalErr = err - return false, nil - } - pods, err := eg.getRunningPodInfo(eg.podName, eg.podNamespace) - if err != nil { - internalErr = err - return false, nil - } - - pid, err := monitoring.PortForward(eg.logf, pods, localPort, eg.podPort, eg.podNamespace) - if err != nil { - internalErr = err - return false, nil - } - internalErr = nil - - eg.forwardPID = pid - eg.port = localPort - eg.host = "localhost" - return true, nil - }) - if internalErr != nil { - return fmt.Errorf("timeout forwarding port: %v", internalErr) - } - return nil -} - -// Return the min available, max seen by the recordevents pod. -// maxRet is the largest event that has ever been seen (whether it's been trimmed -// or not). minRet is the smallest event still available via Get, or 1+maxRet if -// no events are available. maxRet starts at 0 when no events have been seen. -func (eg *eventGetter) getMinMax() (minRet int, maxRet int, errRet error) { - resp, err := http.Get(fmt.Sprintf("http://%s:%d%s", eg.host, eg.port, GetMinMaxPath)) - if err != nil { - return -1, -1, fmt.Errorf("http get error: %v", err) - } - defer resp.Body.Close() - bodyContents, err := ioutil.ReadAll(resp.Body) - if err != nil { - return -1, -1, fmt.Errorf("error reading response body %w", err) - } - if resp.StatusCode != http.StatusOK { - return -1, -1, fmt.Errorf("error %d reading GetMinMax response", resp.StatusCode) - } - minMaxResponse := MinMaxResponse{} - err = json.Unmarshal(bodyContents, &minMaxResponse) - if err != nil { - return -1, -1, fmt.Errorf("error unmarshalling response %w", err) - } - if minMaxResponse.MinAvail == 0 { - return -1, -1, fmt.Errorf("invalid decoded json: %+v", minMaxResponse) - } - - return minMaxResponse.MinAvail, minMaxResponse.MaxSeen, nil -} - -// Return the event with the provided sequence number. Returns the appropriate -// EventInfo or an error if no such event is known, or the event has already -// been trimmed. -func (eg *eventGetter) getEntry(seqno int) (EventInfo, error) { - resp, err := http.Get(fmt.Sprintf("http://%s:%d%s/%d", eg.host, eg.port, GetEntryPath, seqno)) - if err != nil { - return EventInfo{}, fmt.Errorf("http get err %v", err) - } - defer resp.Body.Close() - bodyContents, err := ioutil.ReadAll(resp.Body) - if err != nil { - return EventInfo{}, fmt.Errorf("error reading response body %w", err) - } - if resp.StatusCode != http.StatusOK { - return EventInfo{}, fmt.Errorf("error %d reading GetEntry response", resp.StatusCode) - } - entryResponse := EventInfo{} - err = json.Unmarshal(bodyContents, &entryResponse) - if err != nil { - return EventInfo{}, fmt.Errorf("error unmarshalling response %w", err) - } - if len(entryResponse.Error) == 0 && entryResponse.Event == nil { - return EventInfo{}, fmt.Errorf("invalid decoded json: %+v", entryResponse) - } - - return entryResponse, nil -} - -// Trim the events up to and including seqno from the recordevents pod. -// Returns an error if a nonsensical seqno is passed in, but does not return -// error for trimming already trimmed regions. -func (eg *eventGetter) trimThrough(seqno int) error { - resp, err := http.Post(fmt.Sprintf("http://%s:%d%s/%d", eg.host, eg.port, TrimThroughPath, seqno), "", nil) - if err != nil { - return fmt.Errorf("http post err %v", err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body %w", err) - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("error %d reading TrimThrough response: %s", resp.StatusCode, string(body)) - } - - return nil -} - -// Clean up the getter by tearing down the port forward. -func (eg *eventGetter) cleanup() { - pid := eg.forwardPID - eg.forwardPID = 0 - if pid != 0 { - monitoring.Cleanup(pid) - } -} - -// Wait (up to timeoutEvRetry) for the pod to RestAPI to answer request. -func (eg *eventGetter) waitTillUp() error { - var internalErr error - wait.PollImmediate(minEvRetryInterval, timeoutEvRetry, func() (bool, error) { - _, _, internalErr = eg.getMinMax() - if internalErr != nil { - return false, nil - } - return true, nil - }) - if internalErr != nil { - return fmt.Errorf("timeout waiting for recordevents pod to come up: %v", internalErr) - } - return nil -} diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/event_info_store.go b/vendor/knative.dev/eventing/test/lib/recordevents/event_info_store.go index 3b0c46cf44..bedd6249be 100644 --- a/vendor/knative.dev/eventing/test/lib/recordevents/event_info_store.go +++ b/vendor/knative.dev/eventing/test/lib/recordevents/event_info_store.go @@ -17,6 +17,7 @@ limitations under the License. package recordevents import ( + "encoding/json" "fmt" "strconv" "strings" @@ -27,177 +28,84 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" - pkgTest "knative.dev/pkg/test" testlib "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/resources" ) const ( // The interval and timeout used for checking events - minEvRetryInterval = 4 * time.Second - timeoutEvRetry = 4 * time.Minute + retryInterval = 4 * time.Second + retryTimeout = 4 * time.Minute ) +// Deploys a new recordevents pod and start the associated EventInfoStore +func StartEventRecordOrFail(client *testlib.Client, podName string, options ...EventRecordOption) (*EventInfoStore, *corev1.Pod) { + eventRecordPod := DeployEventRecordOrFail(client, podName, options...) + + eventTracker, err := NewEventInfoStore(client, podName, client.Namespace) + if err != nil { + client.T.Fatalf("Failed to start the EventInfoStore associated to pod '%s': %v", podName, err) + } + return eventTracker, eventRecordPod +} + // Stateful store of events received by the recordevents pod it is pointed at. // This pulls events from the pod during any Find or Wait call, storing them // locally and triming them from the remote pod store. type EventInfoStore struct { tb testing.TB - podName string - getter eventGetterInterface - - lock sync.Mutex - allEvents []EventInfo - firstID int - closeCh chan struct{} - doRefresh chan chan error - timeout time.Duration - retryInterval time.Duration -} - -// Functions used for getting data from the REST api of the recordevents pod. -// The interface exists for use with unit tests of this module. -type eventGetterInterface interface { - getMinMax() (minRet int, maxRet int, errRet error) - getEntry(seqno int) (EventInfo, error) - trimThrough(seqno int) error - cleanup() -} + podName string + podNamespace string -// Internal function to create an event store. This is called directly by unit tests of -// this module. -func newTestableEventInfoStore(egi eventGetterInterface, retryInterval time.Duration, - timeout time.Duration) *EventInfoStore { - if timeout == -1 { - timeout = timeoutEvRetry - } - if retryInterval == -1 { - retryInterval = minEvRetryInterval - } - ei := &EventInfoStore{getter: egi, firstID: 1, timeout: timeout, retryInterval: retryInterval} - ei.start() - return ei + lock sync.Mutex + collected []EventInfo } // Creates an EventInfoStore that is used to iteratively download events recorded by the -// recordevents pod. Calling this forwards the recordevents port to the local machine -// and blocks waiting to connect to that pod. Fails if it cannot connect within -// the expected timeout (4 minutes currently) -func NewEventInfoStore(client *testlib.Client, podName string) (*EventInfoStore, error) { - egi, err := newEventGetter(podName, client, client.T.Logf) - if err != nil { - return nil, err +// recordevents pod. +func NewEventInfoStore(client *testlib.Client, podName string, podNamespace string) (*EventInfoStore, error) { + store := &EventInfoStore{ + tb: client.T, + podName: podName, + podNamespace: podNamespace, } - ei := newTestableEventInfoStore(egi, -1, -1) - ei.podName = podName - ei.tb = client.T - client.Cleanup(ei.cleanup) - return ei, nil -} - -type EventRecordOption = func(*corev1.Pod, *testlib.Client) error -// Deploys a new recordevents pod and start the associated EventInfoStore -func StartEventRecordOrFail(client *testlib.Client, podName string, options ...EventRecordOption) (*EventInfoStore, *corev1.Pod) { - eventRecordPod := resources.EventRecordPod(podName) - client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(podName))...) - err := pkgTest.WaitForPodRunning(client.Kube, podName, client.Namespace) - if err != nil { - client.T.Fatalf("Failed to start the recordevent pod '%s': %v", podName, errors.WithStack(err)) - } - client.WaitForServiceEndpointsOrFail(podName, 1) + client.EventListener.AddHandler(store.handle) - eventTracker, err := NewEventInfoStore(client, podName) - if err != nil { - client.T.Fatalf("Failed to start the EventInfoStore associated to pod '%s': %v", podName, err) - } - return eventTracker, eventRecordPod + return store, nil } -// Starts the single threaded background goroutine used to update local state -// from the remote REST API. -func (ei *EventInfoStore) start() { - ei.closeCh = make(chan struct{}) - ei.doRefresh = make(chan chan error) - go func() { - for { - select { - case <-ei.closeCh: - ei.getter.cleanup() - return - case replyCh := <-ei.doRefresh: - replyCh <- ei.doRetrieveData() - } - } - }() +func (ei *EventInfoStore) getEventInfo() []EventInfo { + ei.lock.Lock() + defer ei.lock.Unlock() + return ei.collected } -// The data update thread used by the single threaded background goroutine -// for updating data from the REST api. -func (ei *EventInfoStore) doRetrieveData() error { - min, max, err := ei.getter.getMinMax() - if err != nil { - return fmt.Errorf("error getting MinMax %v", err) - } - ei.lock.Lock() - curMin := ei.firstID - curMax := curMin + len(ei.allEvents) - 1 - ei.lock.Unlock() - if min == max+1 { - // Nothing to read or trim - return nil - } else { - if min > curMax+1 { - return fmt.Errorf("mismatched stored max/available min: %d, %d", curMax, min) - } - min = curMax + 1 - // We may have data to read, definitely have data to trim. +func (ei *EventInfoStore) handle(event *corev1.Event) { + // Filter events + if !ei.isMyEvent(event) { + return } - var newEvents []EventInfo - for i := min; i <= max; i++ { - e, err := ei.getter.getEntry(i) - if err != nil { - return fmt.Errorf("error calling getEntry of %d %v", i, err) - } - newEvents = append(newEvents, e) + eventInfo := EventInfo{} + err := json.Unmarshal([]byte(event.Message), &eventInfo) + if err != nil { + ei.tb.Errorf("Received EventInfo that cannot be unmarshalled! %+v", err) + return } - ei.lock.Lock() - ei.allEvents = append(ei.allEvents, newEvents...) - ei.lock.Unlock() - err = ei.getter.trimThrough(max) - return err + ei.lock.Lock() + defer ei.lock.Unlock() + ei.collected = append(ei.collected, eventInfo) } -// Clean up any background resources used by the store. Must be called exactly once after -// the last use. -func (ei *EventInfoStore) cleanup() { - close(ei.closeCh) -} - -//TODO remove it, this is not useful anymore -// Deprecated: you can remove the manual cleanup of the event getter, since now it's done at test tear down automatically -func (ei *EventInfoStore) Cleanup() {} - -// Called internally by functions wanting the current list of all -// known events. This calls for an update from the REST server and -// returns the summary of all locally and remotely known events. -// Returns an error in case of a connection or protocol error. -func (ei *EventInfoStore) refreshData() ([]EventInfo, error) { - var allEvents []EventInfo - replyCh := make(chan error) - ei.doRefresh <- replyCh - err := <-replyCh - if err != nil { - return nil, err - } - ei.lock.Lock() - allEvents = append(allEvents, ei.allEvents...) - ei.lock.Unlock() - return allEvents, nil +func (ei *EventInfoStore) isMyEvent(event *corev1.Event) bool { + return event.Type == corev1.EventTypeNormal && + event.Reason == CloudEventObservedReason && + event.InvolvedObject.Kind == "Pod" && + event.InvolvedObject.Name == ei.podName && + event.InvolvedObject.Namespace == ei.podNamespace } // Find all events received by the recordevents pod that match the provided matchers, @@ -215,10 +123,7 @@ func (ei *EventInfoStore) Find(matchers ...EventInfoMatcher) ([]EventInfo, Searc lastEvents := []EventInfo{} var nonMatchingErrors []error - allEvents, err := ei.refreshData() - if err != nil { - return nil, sInfo, nonMatchingErrors, fmt.Errorf("error getting events %v", err) - } + allEvents := ei.getEventInfo() for i := range allEvents { if err := f(allEvents[i]); err == nil { allMatch = append(allMatch, allEvents[i]) @@ -240,27 +145,31 @@ func (ei *EventInfoStore) Find(matchers ...EventInfoMatcher) ([]EventInfo, Searc // Assert that there are at least min number of match for the provided matchers. // This method fails the test if the assert is not fulfilled. func (ei *EventInfoStore) AssertAtLeast(min int, matchers ...EventInfoMatcher) []EventInfo { + ei.tb.Helper() events, err := ei.waitAtLeastNMatch(AllOf(matchers...), min) if err != nil { ei.tb.Fatalf("Timeout waiting for at least %d matches.\nError: %+v", min, errors.WithStack(err)) } + ei.tb.Logf("Assert passed") return events } // Assert that there are at least min number of matches and at most max number of matches for the provided matchers. // This method fails the test if the assert is not fulfilled. func (ei *EventInfoStore) AssertInRange(min int, max int, matchers ...EventInfoMatcher) []EventInfo { + ei.tb.Helper() events := ei.AssertAtLeast(min, matchers...) if max > 0 && len(events) > max { ei.tb.Fatalf("Assert in range failed: %+v", errors.WithStack(fmt.Errorf("expected <= %d events, saw %d", max, len(events)))) } - + ei.tb.Logf("Assert passed") return events } // Assert that there aren't any matches for the provided matchers. // This method fails the test if the assert is not fulfilled. func (ei *EventInfoStore) AssertNot(matchers ...EventInfoMatcher) []EventInfo { + ei.tb.Helper() res, recentEvents, _, err := ei.Find(matchers...) if err != nil { ei.tb.Fatalf("Unexpected error during find on recordevents '%s': %+v", ei.podName, errors.WithStack(err)) @@ -271,14 +180,17 @@ func (ei *EventInfoStore) AssertNot(matchers ...EventInfoMatcher) []EventInfo { fmt.Errorf("Unexpected matches on recordevents '%s', found: %v. %s", ei.podName, res, &recentEvents)), ) } - + ei.tb.Logf("Assert passed") return res } // Assert that there are exactly n matches for the provided matchers. // This method fails the test if the assert is not fulfilled. func (ei *EventInfoStore) AssertExact(n int, matchers ...EventInfoMatcher) []EventInfo { - return ei.AssertInRange(n, n, matchers...) + ei.tb.Helper() + events := ei.AssertInRange(n, n, matchers...) + ei.tb.Logf("Assert passed") + return events } // Wait a long time (currently 4 minutes) until the provided function matches at least @@ -289,7 +201,7 @@ func (ei *EventInfoStore) waitAtLeastNMatch(f EventInfoMatcher, min int) ([]Even var matchRet []EventInfo var internalErr error - wait.PollImmediate(ei.retryInterval, ei.timeout, func() (bool, error) { + wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { allMatch, sInfo, matchErrs, err := ei.Find(f) if err != nil { internalErr = fmt.Errorf("FAIL MATCHING: unexpected error during find: %v", err) diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/event_log.go b/vendor/knative.dev/eventing/test/lib/recordevents/event_log.go new file mode 100644 index 0000000000..62aba6113b --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/recordevents/event_log.go @@ -0,0 +1,33 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recordevents + +// EventLog is the contract for an event logger to vent an event. +type EventLog interface { + Vent(observed EventInfo) error +} + +type EventLogs []EventLog + +func (e EventLogs) Vent(observed EventInfo) error { + for _, el := range e { + if err := el.Vent(observed); err != nil { + return err + } + } + return nil +} diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/logger_vent/logger.go b/vendor/knative.dev/eventing/test/lib/recordevents/logger_vent/logger.go new file mode 100644 index 0000000000..95d7012f23 --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/recordevents/logger_vent/logger.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logger_vent + +import "knative.dev/eventing/test/lib/recordevents" + +type Logger func(string, ...interface{}) + +func (l Logger) Vent(observed recordevents.EventInfo) error { + l("Event: \n%s", observed.String()) + + return nil +} diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/observer/observer.go b/vendor/knative.dev/eventing/test/lib/recordevents/observer/observer.go new file mode 100644 index 0000000000..58e40a1c52 --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/recordevents/observer/observer.go @@ -0,0 +1,193 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package observer + +import ( + "context" + "net/http" + "strings" + "sync/atomic" + "time" + + "knative.dev/eventing/test/lib/dropevents" + + cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" + cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/kelseyhightower/envconfig" + "go.uber.org/zap" + "knative.dev/pkg/logging" + + "knative.dev/eventing/test/lib/recordevents" +) + +// Observer is the entry point for sinking events into the event log. +type Observer struct { + + // Name is the name of this Observer, used to filter if multiple observers. + Name string + // EventLogs is the list of EventLog implementors to vent observed events. + EventLogs recordevents.EventLogs + + ctx context.Context + seq uint64 + replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) + counter *dropevents.CounterHandler +} + +type envConfig struct { + // ObserverName is used to identify this instance of the observer. + ObserverName string `envconfig:"POD_NAME" default:"observer-default" required:"true"` + + // Reply is used to define if the observer should reply back + Reply bool `envconfig:"REPLY" default:"false" required:"false"` + + // The event type to use in the reply, if enabled + ReplyEventType string `envconfig:"REPLY_EVENT_TYPE" default:"" required:"false"` + + // The event source to use in the reply, if enabled + ReplyEventSource string `envconfig:"REPLY_EVENT_SOURCE" default:"" required:"false"` + + // The event data to use in the reply, if enabled + ReplyEventData string `envconfig:"REPLY_EVENT_DATA" default:"" required:"false"` + + // This string to append in the data field in the reply, if enabled. + // This will threat the data as text/plain field + ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"` + + // If events should be dropped, specify the strategy here. + SkipStrategy string `envconfig:"SKIP_ALGORITHM" default:"" required:"false"` + + // If events should be dropped according to Linear policy, this controls + // how many events are dropped. + SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"` +} + +func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + logging.FromContext(ctx).Fatal("Failed to process env var", err) + } + + logging.FromContext(ctx).Infof("Observer environment configuration: %+v", env) + + var replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) + if env.Reply { + logging.FromContext(ctx).Info("Observer will reply with an event") + replyFunc = ReplyTransformerFunc(env.ReplyEventType, env.ReplyEventSource, env.ReplyEventData, env.ReplyAppendData) + } else { + logging.FromContext(ctx).Info("Observer won't reply with an event") + replyFunc = NoOpReply + } + var counter *dropevents.CounterHandler + + if env.SkipStrategy != "" { + counter = &dropevents.CounterHandler{ + Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter), + } + } else { + counter = &dropevents.CounterHandler{ + // Don't skip anything, since count is 0. nop skipper. + Skipper: dropevents.SkipperAlgorithmWithCount(dropevents.Sequence, 0), + } + } + + return &Observer{ + Name: env.ObserverName, + EventLogs: eventLogs, + ctx: ctx, + replyFunc: replyFunc, + counter: counter, + } +} + +// Start will create the CloudEvents client and start listening for inbound +// HTTP requests. This is a is a blocking call. +func (o *Observer) Start(ctx context.Context, handlerFuncs ...func(handler http.Handler) http.Handler) error { + var handler http.Handler = o + + for _, dec := range handlerFuncs { + handler = dec(handler) + } + + server := &http.Server{Addr: ":8080", Handler: handler} + + go func() { + if err := server.ListenAndServe(); err != nil { + logging.FromContext(ctx).Fatal("Error while starting the HTTP server", err) + } + }() + + <-ctx.Done() + + logging.FromContext(ctx).Info("Closing the HTTP server") + + return server.Close() +} + +func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + m := cloudeventshttp.NewMessageFromHttpRequest(request) + defer m.Finish(nil) + + event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m) + headers := make(http.Header) + for k, v := range request.Header { + if !strings.HasPrefix(k, "Ce-") { + headers[k] = v + } + } + // Host header is removed from the request.Header map by net/http + if request.Host != "" { + headers.Set("Host", request.Host) + } + + eventErrStr := "" + if eventErr != nil { + eventErrStr = eventErr.Error() + } + + shouldSkip := o.counter.Skip() + + eventInfo := recordevents.EventInfo{ + Error: eventErrStr, + Event: event, + HTTPHeaders: headers, + Origin: request.RemoteAddr, + Observer: o.Name, + Time: time.Now(), + Sequence: atomic.AddUint64(&o.seq, 1), + Dropped: shouldSkip, + } + + // We still want to emit the event to make it easier to see what we had oberved, but + // we want to transform it a little bit before emitting so that it does not count + // as the real event that we want to emit. + if shouldSkip { + eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type()) + } + + err := o.EventLogs.Vent(eventInfo) + if err != nil { + logging.FromContext(o.ctx).Fatalw("Error while venting the recorded event", zap.Error(err)) + } + + if shouldSkip { + // Trigger a redelivery + writer.WriteHeader(http.StatusConflict) + } else { + o.replyFunc(o.ctx, writer, eventInfo) + } +} diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/observer/reply.go b/vendor/knative.dev/eventing/test/lib/recordevents/observer/reply.go new file mode 100644 index 0000000000..f60945d1b0 --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/recordevents/observer/reply.go @@ -0,0 +1,90 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package observer + +import ( + "context" + "net/http" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "go.uber.org/zap" + "knative.dev/pkg/logging" + + "knative.dev/eventing/test/lib/recordevents" +) + +func NoOpReply(_ context.Context, writer http.ResponseWriter, _ recordevents.EventInfo) { + writer.WriteHeader(http.StatusAccepted) +} + +func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyEventData string, replyAppendData string) func(context.Context, http.ResponseWriter, recordevents.EventInfo) { + return func(ctx context.Context, writer http.ResponseWriter, info recordevents.EventInfo) { + if info.Error != "" { + writer.WriteHeader(http.StatusBadRequest) + _, _ = writer.Write([]byte(info.Error)) + logging.FromContext(ctx).Warn("Conversion error in the event to send back", info.Error) + return + } + + if info.Event == nil { + writer.WriteHeader(http.StatusBadRequest) + _, _ = writer.Write([]byte("No event!")) + logging.FromContext(ctx).Warn("No event to send back") + return + } + + outputEvent := info.Event.Clone() + + if replyEventSource != "" { + logging.FromContext(ctx).Infof("Setting reply event source '%s'", replyEventSource) + outputEvent.SetSource(replyEventSource) + } + if replyEventType != "" { + logging.FromContext(ctx).Infof("Setting reply event type '%s'", replyEventType) + outputEvent.SetType(replyEventType) + } + if replyEventData != "" { + logging.FromContext(ctx).Infof("Setting reply event data '%s'", replyAppendData) + if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(replyEventData)); err != nil { + logging.FromContext(ctx).Warn("Cannot set the event data") + } + } + if replyAppendData != "" { + var d string + if outputEvent.Data() == nil { + d = replyAppendData + } else { + if err := info.Event.DataAs(&d); err != nil { + logging.FromContext(ctx).Warn("Cannot read the event data as text/plain") + } + d = d + replyAppendData + } + logging.FromContext(ctx).Infof("Setting appended event data '%s'", d) + if err := outputEvent.SetData(cloudevents.TextPlain, d); err != nil { + logging.FromContext(ctx).Warn("Cannot set the event data") + } + } + + logging.FromContext(ctx).Infow("Replying with", zap.Stringer("event", outputEvent)) + err := cehttp.WriteResponseWriter(ctx, binding.ToMessage(&outputEvent), 200, writer) + if err != nil { + logging.FromContext(ctx).Warn("Error while writing the event as response", err) + } + } +} diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/constructor.go b/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/constructor.go new file mode 100644 index 0000000000..438ed043d3 --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/constructor.go @@ -0,0 +1,78 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recorder_vent + +import ( + "context" + "log" + "time" + + "github.com/kelseyhightower/envconfig" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + ref "k8s.io/client-go/tools/reference" + + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/logging" + + "knative.dev/eventing/test/lib/recordevents" +) + +type envConfig struct { + AgentName string `envconfig:"AGENT_NAME" default:"observer-default" required:"true"` + PodName string `envconfig:"POD_NAME" required:"true"` + PodNamespace string `envconfig:"POD_NAMESPACE" required:"true"` + Port int `envconfig:"PORT" default:"8080" required:"true"` +} + +const ( + maxRetry = 5 + sleepDuration = 5 * time.Second +) + +func NewFromEnv(ctx context.Context) recordevents.EventLog { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatal("Failed to process env var", err) + } + + logging.FromContext(ctx).Infof("Recorder vent environment configuration: %+v", env) + + return NewEventLog(ctx, env.AgentName, env.PodName, env.PodNamespace) +} + +func NewEventLog(ctx context.Context, agentName string, podName string, podNamespace string) recordevents.EventLog { + on, err := kubeclient.Get(ctx).CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if err != nil { + logging.FromContext(ctx).Fatal("Error while trying to retrieve the pod", err) + } + + logging.FromContext(ctx).Infof("Going to send events to pod '%s' in namespace '%s'", on.Name, on.Namespace) + + reference, err := ref.GetReference(scheme.Scheme, on) + if err != nil { + logging.FromContext(ctx).Fatalf("Could not construct reference to: '%#v' due to: '%v'", on, err) + } + + return &recorder{ + ctx: ctx, + namespace: podNamespace, + agentName: agentName, + ref: reference, + } +} diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/doc.go b/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/doc.go new file mode 100644 index 0000000000..1a4bcc2907 --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package recorder_vent implements an recordevents.EventLog backed by Kubernetes +// Events using an event recorder. +package recorder_vent diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/recorder.go b/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/recorder.go new file mode 100644 index 0000000000..117ac2df16 --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/recordevents/recorder_vent/recorder.go @@ -0,0 +1,120 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recorder_vent + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + restclient "k8s.io/client-go/rest" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/logging" + + "knative.dev/eventing/test/lib/recordevents" +) + +type recorder struct { + ctx context.Context + namespace string + agentName string + + ref *corev1.ObjectReference +} + +func (r *recorder) Vent(observed recordevents.EventInfo) error { + b, err := json.Marshal(observed) + if err != nil { + return err + } + message := string(b) + + t := time.Now() + // Note: DO NOT SET EventTime, or you'll trigger k8s api server hilarity: + // - https://github.com/kubernetes/kubernetes/issues/95913 + // - https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/validation/events.go#L122 + event := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%d", r.ref.Name, observed.Sequence), + Namespace: r.namespace, + }, + InvolvedObject: *r.ref, + Reason: recordevents.CloudEventObservedReason, + Message: message, + Source: corev1.EventSource{Component: r.agentName}, + FirstTimestamp: metav1.Time{Time: t}, + LastTimestamp: metav1.Time{Time: t}, + Count: 1, + Type: corev1.EventTypeNormal, + } + + return r.recordEvent(event) +} + +func (r *recorder) recordEvent(event *corev1.Event) error { + tries := 0 + for { + done, err := r.trySendEvent(event) + if done { + return nil + } + tries++ + if tries >= maxRetry { + logging.FromContext(r.ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name) + return err + } + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) + } else { + time.Sleep(sleepDuration) + } + } +} + +func (r *recorder) trySendEvent(event *corev1.Event) (bool, error) { + newEv, err := kubeclient.Get(r.ctx).CoreV1().Events(r.namespace).CreateWithEventNamespace(event) + if err == nil { + logging.FromContext(r.ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID) + return true, nil + } + + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + switch err.(type) { + case *restclient.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + logging.FromContext(r.ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err) + return true, err + case *apierrors.StatusError: + logging.FromContext(r.ctx).Errorf("Server rejected event '%s'. Reason: '%v' (will not retry!). Event: %v", event.Name, err, event) + return true, err + case *apierrors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + logging.FromContext(r.ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return false, err +} diff --git a/vendor/knative.dev/eventing/test/lib/recordevents/resources.go b/vendor/knative.dev/eventing/test/lib/recordevents/resources.go new file mode 100644 index 0000000000..e3eb191ff4 --- /dev/null +++ b/vendor/knative.dev/eventing/test/lib/recordevents/resources.go @@ -0,0 +1,152 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recordevents + +import ( + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + pkgtest "knative.dev/pkg/test" + + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/resources" +) + +type EventRecordOption = func(*corev1.Pod, *testlib.Client) error + +// EchoEvent is an option to let the recordevents reply with the received event +func EchoEvent(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + return nil +} + +var _ EventRecordOption = EchoEvent + +// ReplyWithTransformedEvent is an option to let the recordevents reply with the transformed event +func ReplyWithTransformedEvent(replyEventType string, replyEventSource string, replyEventData string) EventRecordOption { + return func(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + if replyEventType != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_TYPE", Value: replyEventType}, + ) + } + if replyEventSource != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_SOURCE", Value: replyEventSource}, + ) + } + if replyEventData != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_DATA", Value: replyEventData}, + ) + } + + return nil + } +} + +// ReplyWithAppendedData is an option to let the recordevents reply with the transformed event with appended data +func ReplyWithAppendedData(appendData string) EventRecordOption { + return func(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + if appendData != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_APPEND_DATA", Value: appendData}, + ) + } + + return nil + } +} + +// DeployEventRecordOrFail deploys the recordevents image with necessary sa, roles, rb to execute the image +func DeployEventRecordOrFail(client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod { + client.CreateServiceAccountOrFail(name) + client.CreateRoleOrFail(resources.Role(name, + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get"}, + }), + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{rbacv1.VerbAll}, + }), + )) + client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace) + + eventRecordPod := recordEventsPod("recordevents", name, name, client.Namespace) + client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...) + err := pkgtest.WaitForPodRunning(client.Kube, name, client.Namespace) + if err != nil { + client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err)) + } + client.WaitForServiceEndpointsOrFail(name, 1) + return eventRecordPod +} + +func recordEventsPod(imageName string, name string, serviceAccountName string, namespace string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: imageName, + Image: pkgtest.ImagePath(imageName), + ImagePullPolicy: corev1.PullAlways, + Env: []corev1.EnvVar{{ + Name: "SYSTEM_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}, + }, + }, { + Name: "POD_NAMESPACE", + Value: namespace, + }, { + Name: "OBSERVER", + Value: "recorder-" + name, + }, { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}, + }, + }}, + }}, + ServiceAccountName: serviceAccountName, + RestartPolicy: corev1.RestartPolicyAlways, + }, + } +} diff --git a/vendor/knative.dev/eventing/test/lib/resources/kube.go b/vendor/knative.dev/eventing/test/lib/resources/kube.go index 2b154fe2b2..d9dc37b90b 100644 --- a/vendor/knative.dev/eventing/test/lib/resources/kube.go +++ b/vendor/knative.dev/eventing/test/lib/resources/kube.go @@ -27,7 +27,6 @@ import ( rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/uuid" pkgTest "knative.dev/pkg/test" ) @@ -37,55 +36,6 @@ type PodOption func(*corev1.Pod) // Option enables further configuration of a Role. type RoleOption func(*rbacv1.Role) -// EventRecordPod creates a Pod that stores received events for test retrieval. -func EventRecordPod(name string) *corev1.Pod { - return eventLoggerPod("recordevents", name) -} - -func eventLoggerPod(imageName string, name string) *corev1.Pod { - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: imageName, - Image: pkgTest.ImagePath(imageName), - ImagePullPolicy: corev1.PullAlways, - }}, - RestartPolicy: corev1.RestartPolicyAlways, - }, - } -} - -// EventTransformationPod creates a Pod that transforms events received receiving as arg a cloudevents sdk2 Event -func EventTransformationPod(name string, newEventType string, newEventSource string, newEventData []byte) *corev1.Pod { - const imageName = "transformevents" - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: imageName, - Image: pkgTest.ImagePath(imageName), - ImagePullPolicy: corev1.PullAlways, - Args: []string{ - "-event-type", - newEventType, - "-event-source", - newEventSource, - "-event-data", - string(newEventData), - }, - }}, - RestartPolicy: corev1.RestartPolicyAlways, - }, - } -} - // HelloWorldPod creates a Pod that logs "Hello, World!". func HelloWorldPod(name string, options ...PodOption) *corev1.Pod { const imageName = "print" @@ -115,54 +65,6 @@ func WithLabelsForPod(labels map[string]string) PodOption { } } -// SequenceStepperPod creates a Pod that can be used as a step in testing Sequence. -// Note event data used in the test must be BaseData, and this Pod as a Subscriber will receive the event, -// and return a new event with eventMsgAppender added to data.Message. -func SequenceStepperPod(name, eventMsgAppender string) *corev1.Pod { - const imageName = "sequencestepper" - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: imageName, - Image: pkgTest.ImagePath(imageName), - ImagePullPolicy: corev1.PullAlways, - Args: []string{ - "-msg-appender", - eventMsgAppender, - }, - }}, - RestartPolicy: corev1.RestartPolicyAlways, - }, - } -} - -// EventFilteringPod creates a Pod that either filter or send the received CloudEvent -func EventFilteringPod(name string, filter bool) *corev1.Pod { - const imageName = "filterevents" - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: imageName, - Image: pkgTest.ImagePath(imageName), - ImagePullPolicy: corev1.PullAlways, - }}, - RestartPolicy: corev1.RestartPolicyAlways, - }, - } - if filter { - pod.Spec.Containers[0].Args = []string{"-filter"} - } - return pod -} - const ( PerfConsumerService = "perf-consumer" PerfAggregatorService = "perf-aggregator" diff --git a/vendor/knative.dev/eventing/test/lib/test_runner.go b/vendor/knative.dev/eventing/test/lib/test_runner.go index 553ebeff08..863453c530 100644 --- a/vendor/knative.dev/eventing/test/lib/test_runner.go +++ b/vendor/knative.dev/eventing/test/lib/test_runner.go @@ -19,6 +19,8 @@ package lib import ( "fmt" "path/filepath" + "sort" + "strings" "testing" "time" @@ -189,8 +191,25 @@ func TearDown(client *Client) { if err != nil { client.T.Logf("Could not list events in the namespace %q: %v", client.Namespace, err) } else { - for _, e := range el.Items { - client.T.Logf("EVENT: %v", e) + // Elements has to be ordered first + items := el.Items + sort.SliceStable(items, func(i, j int) bool { + // Some events might not contain last timestamp, in that case we fallback to event time + iTime := items[i].LastTimestamp.Time + if iTime.IsZero() { + iTime = items[i].EventTime.Time + } + + jTime := items[j].LastTimestamp.Time + if jTime.IsZero() { + jTime = items[j].EventTime.Time + } + + return iTime.Before(jTime) + }) + + for _, e := range items { + client.T.Log(formatEvent(&e)) } } @@ -210,6 +229,27 @@ func TearDown(client *Client) { } } +func formatEvent(e *corev1.Event) string { + return strings.Join([]string{`Event{`, + `ObjectMeta:` + strings.Replace(strings.Replace(e.ObjectMeta.String(), "ObjectMeta", "v1.ObjectMeta", 1), `&`, ``, 1), + `InvolvedObject:` + strings.Replace(strings.Replace(e.InvolvedObject.String(), "ObjectReference", "ObjectReference", 1), `&`, ``, 1), + `Reason:` + e.Reason, + `Message:` + e.Message, + `Source:` + strings.Replace(strings.Replace(e.Source.String(), "EventSource", "EventSource", 1), `&`, ``, 1), + `FirstTimestamp:` + e.FirstTimestamp.String(), + `LastTimestamp:` + e.LastTimestamp.String(), + `Count:` + fmt.Sprintf("%d", e.Count), + `Type:` + e.Type, + `EventTime:` + e.EventTime.String(), + `Series:` + strings.Replace(e.Series.String(), "EventSeries", "EventSeries", 1), + `Action:` + e.Action, + `Related:` + strings.Replace(e.Related.String(), "ObjectReference", "ObjectReference", 1), + `ReportingController:` + e.ReportingController, + `ReportingInstance:` + e.ReportingInstance, + `}`, + }, "\n") +} + // CreateNamespaceIfNeeded creates a new namespace if it does not exist. func CreateNamespaceIfNeeded(t *testing.T, client *Client, namespace string) { _, err := client.Kube.Kube.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}) diff --git a/vendor/knative.dev/eventing/test/test_images/recordevents/eventstore.go b/vendor/knative.dev/eventing/test/test_images/recordevents/eventstore.go deleted file mode 100644 index 470e5adc21..0000000000 --- a/vendor/knative.dev/eventing/test/test_images/recordevents/eventstore.go +++ /dev/null @@ -1,191 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "encoding/json" - "fmt" - "sync" - - cloudevents "github.com/cloudevents/sdk-go/v2" - - "knative.dev/eventing/test/lib/recordevents" -) - -// Number of EventInfo per block -const evBlockSize = 100 - -// Block of stored EventInfo -type eventBlock struct { - // seqno of [0] evInfoBytes entry - firstIndex int - // offset inside block for newly appended event - firstOffsetFree int - // offset inside block of first non-trimmed event - firstValid int - // serialized EventInfo structures for each seqno. We enforce - // that there is always at least one block. - evInfoBytes [evBlockSize][]byte -} - -// All events currently seen -type eventStore struct { - // Blocks of events in increasing sequency number order - evBlocks []*eventBlock - evBlocksLock sync.Mutex -} - -// Create a new event store. -func newEventStore() *eventStore { - es := &eventStore{} - es.evBlocks = []*eventBlock{{}} - - // One block with no entries starting at sequence number 1 - es.evBlocks[0].firstIndex = 1 - es.evBlocks[0].firstOffsetFree = 0 - es.evBlocks[0].firstValid = 0 - return es -} - -// See if there's enough room to append to the current last block. If not, -// append an extra block. -func (es *eventStore) checkAppendBlock() { - if es.evBlocks[len(es.evBlocks)-1].firstOffsetFree == evBlockSize { - newEVBlock := &eventBlock{ - firstIndex: es.evBlocks[len(es.evBlocks)-1].firstIndex + evBlockSize, - } - es.evBlocks = append(es.evBlocks, newEVBlock) - } -} - -// Store the specified event. -func (es *eventStore) StoreEvent(event *cloudevents.Event, evErr error, httpHeaders map[string][]string) { - var evInfo recordevents.EventInfo - var err error - var evInfoBytes []byte - if evErr != nil { - evInfo.HTTPHeaders = httpHeaders - evInfo.Error = evErr.Error() - if evInfo.Error == "" { - evInfo.Error = "Unknown Incoming Error" - } - evInfoBytes, err = json.Marshal(&evInfo) - if err != nil { - panic(fmt.Errorf("unexpected marshal error (%v) (%+v)", err, evInfo)) - } - } else { - evInfo.Event = event - evInfo.HTTPHeaders = httpHeaders - evInfoBytes, err = json.Marshal(&evInfo) - - if err != nil { - evInfo.Event = nil - evInfo.Error = err.Error() - if evInfo.Error == "" { - evInfo.Error = "Unknown Error" - } - evInfoBytes, err = json.Marshal(&evInfo) - if err != nil { - panic(fmt.Errorf("unexpected marshal error (%v) (%+v)", err, evInfo)) - } - } - } - - es.evBlocksLock.Lock() - // Add a new block if we're out of space - es.checkAppendBlock() - - evBlock := es.evBlocks[len(es.evBlocks)-1] - if evBlock.firstOffsetFree < evBlockSize { - evBlock.evInfoBytes[evBlock.firstOffsetFree] = evInfoBytes - evBlock.firstOffsetFree++ - } - - es.evBlocksLock.Unlock() -} - -// Logically trim all events up to and include the provided -// sequence number. Returns error for patently incorrect -// values (negative sequence number or sequence number larger -// than the largest event seen). Trimming already trimmed -// regions is legal. -func (es *eventStore) TrimThrough(through int) error { - es.evBlocksLock.Lock() - defer es.evBlocksLock.Unlock() - minAvail, maxSeen := es.minMaxUnlocked() - if through > maxSeen { - return fmt.Errorf("invalid trim through %d, maxSeen %d", through, maxSeen) - } else if through < 0 { - return fmt.Errorf("invalid trim less than zero %d", through) - } else if through < minAvail { - return nil - } - // Completely remove blocks if they are full and all events in them are less than - // the specified value. - for len(es.evBlocks) > 1 && (es.evBlocks[0].firstIndex+evBlockSize-1) <= through { - es.evBlocks = es.evBlocks[1:] - } - // Logically trim the block split by through. - es.evBlocks[0].firstValid = through - es.evBlocks[0].firstIndex + 1 - return nil -} - -// return min/max untrimmed value when already holding the lock -func (es *eventStore) minMaxUnlocked() (minAvail int, maxSeen int) { - minBlock := es.evBlocks[0] - minAvail = minBlock.firstIndex + (minBlock.firstValid) - - maxBlock := es.evBlocks[len(es.evBlocks)-1] - maxSeen = maxBlock.firstIndex + maxBlock.firstOffsetFree - 1 - return minAvail, maxSeen -} - -// Returns min available value and max seen value for the store. -// min is the minimum value that can be retrieved via GetEntry, or -// if no values can be retrieved, min == max+1. Max starts at 0 -// when no events have been seen. -func (es *eventStore) MinMax() (minAvail int, maxSeen int) { - es.evBlocksLock.Lock() - minAvail, maxSeen = es.minMaxUnlocked() - - es.evBlocksLock.Unlock() - return minAvail, maxSeen -} - -// Get the already serialized EventInfo structure for the provided sequence -// number. -func (es *eventStore) GetEventInfoBytes(seq int) ([]byte, error) { - var evInfoBytes []byte - found := false - - es.evBlocksLock.Lock() - for _, block := range es.evBlocks { - if seq < block.firstIndex+block.firstValid { - break - } - if seq < block.firstIndex+block.firstOffsetFree { - found = true - evInfoBytes = block.evInfoBytes[seq-block.firstIndex] - break - } - } - es.evBlocksLock.Unlock() - if !found { - return evInfoBytes, fmt.Errorf("Invalid sequence number %d", seq) - } - return evInfoBytes, nil -} diff --git a/vendor/knative.dev/eventing/test/test_images/recordevents/main.go b/vendor/knative.dev/eventing/test/test_images/recordevents/main.go index 8dd7411633..95e45db54f 100644 --- a/vendor/knative.dev/eventing/test/test_images/recordevents/main.go +++ b/vendor/knative.dev/eventing/test/test_images/recordevents/main.go @@ -18,163 +18,41 @@ package main import ( "context" - "encoding/json" - "fmt" "log" - "net/http" - "os" - "strconv" - "strings" - cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" - cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" - "go.uber.org/zap" + "k8s.io/client-go/rest" + "knative.dev/pkg/injection" + "knative.dev/pkg/logging" + _ "knative.dev/pkg/system/testing" "knative.dev/eventing/pkg/kncloudevents" - testlib "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/dropevents" - "knative.dev/eventing/test/lib/recordevents" + "knative.dev/eventing/test/lib/recordevents/logger_vent" + "knative.dev/eventing/test/lib/recordevents/observer" + "knative.dev/eventing/test/lib/recordevents/recorder_vent" "knative.dev/eventing/test/test_images" ) -type eventRecorder struct { - es *eventStore -} - -func newEventRecorder() *eventRecorder { - return &eventRecorder{es: newEventStore()} -} - -// Start the recordevents REST api server -func (er *eventRecorder) StartServer(port int) { - http.HandleFunc(recordevents.GetMinMaxPath, er.handleMinMax) - http.HandleFunc(recordevents.GetEntryPath, er.handleGetEntry) - http.HandleFunc(recordevents.TrimThroughPath, er.handleTrim) - go http.ListenAndServe(fmt.Sprintf(":%d", port), nil) -} - -// HTTP handler for GetMinMax requests -func (er *eventRecorder) handleMinMax(w http.ResponseWriter, r *http.Request) { - minAvail, maxSeen := er.es.MinMax() - minMax := recordevents.MinMaxResponse{ - MinAvail: minAvail, - MaxSeen: maxSeen, - } - respBytes, err := json.Marshal(minMax) - if err != nil { - log.Panicf("Internal error: json marshal shouldn't fail: (%v) (%+v)", err, minMax) - } - - w.Header().Set("Content-Type", "text/json") - w.WriteHeader(http.StatusOK) - w.Write(respBytes) -} - -// HTTP handler for TrimThrough requests -func (er *eventRecorder) handleTrim(w http.ResponseWriter, r *http.Request) { - // If we extend this much at all we should vendor a better mux(gorilla, etc) - path := strings.TrimLeft(r.URL.Path, "/") - getPrefix := strings.TrimLeft(recordevents.TrimThroughPath, "/") - suffix := strings.TrimLeft(strings.TrimPrefix(path, getPrefix), "/") - - seqNum, err := strconv.ParseInt(suffix, 10, 32) +func main() { + cfg, err := rest.InClusterConfig() if err != nil { - http.Error(w, "Can't parse event sequence number in request", http.StatusBadRequest) - return + log.Fatal("Error while reading the cfg", err) } + ctx, _ := injection.Default.SetupInformers(context.TODO(), cfg) - err = er.es.TrimThrough(int(seqNum)) - if err != nil { - http.Error(w, "Invalid sequence number in request to trim", http.StatusNotFound) - return + if err := test_images.ConfigureTracing(logging.FromContext(ctx), ""); err != nil { + logging.FromContext(ctx).Fatal("Unable to setup trace publishing", err) } - w.Header().Set("Content-Type", "text/json") - w.WriteHeader(http.StatusOK) -} - -// HTTP handler for GetEntry requests -func (er *eventRecorder) handleGetEntry(w http.ResponseWriter, r *http.Request) { - // If we extend this much at all we should vendor a better mux(gorilla, etc) - path := strings.TrimLeft(r.URL.Path, "/") - getPrefix := strings.TrimLeft(recordevents.GetEntryPath, "/") - suffix := strings.TrimLeft(strings.TrimPrefix(path, getPrefix), "/") + obs := observer.NewFromEnv(ctx, + logger_vent.Logger(logging.FromContext(ctx).Infof), + recorder_vent.NewFromEnv(ctx), + ) - seqNum, err := strconv.ParseInt(suffix, 10, 32) - if err != nil { - http.Error(w, "Can't parse event sequence number in request", http.StatusBadRequest) - return - } + err = obs.Start(ctx, kncloudevents.CreateHandler) - entryBytes, err := er.es.GetEventInfoBytes(int(seqNum)) if err != nil { - http.Error(w, "Couldn't find requested event", http.StatusNotFound) - return + logging.FromContext(ctx).Fatal("Error during start", err) } - w.Header().Set("Content-Type", "text/json") - w.WriteHeader(http.StatusOK) - w.Write(entryBytes) -} - -func (er *eventRecorder) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - m := cloudeventshttp.NewMessageFromHttpRequest(request) - defer m.Finish(nil) - - event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m) - header := request.Header - - er.es.StoreEvent(event, eventErr, map[string][]string(header)) - - headerNameList := testlib.InterestingHeaders() - for _, headerName := range headerNameList { - if headerValue := header.Get(headerName); headerValue != "" { - log.Printf("Header %s: %s\n", headerName, headerValue) - } - } - - if eventErr != nil { - log.Printf("error receiving the event: %v", eventErr) - } else { - valErr := event.Validate() - if valErr == nil { - log.Printf("eventdetails:\n%s", event.String()) - } else { - log.Printf("error validating the event: %v", valErr) - } - } - - writer.WriteHeader(http.StatusAccepted) -} - -func main() { - er := newEventRecorder() - er.StartServer(recordevents.RecordEventsPort) - - logger, _ := zap.NewDevelopment() - if err := test_images.ConfigureTracing(logger.Sugar(), ""); err != nil { - log.Fatalf("Unable to setup trace publishing: %v", err) - } - - algorithm, ok := os.LookupEnv(dropevents.SkipAlgorithmKey) - handler := kncloudevents.CreateHandler(er) - if ok { - skipper := dropevents.SkipperAlgorithm(algorithm) - counter := dropevents.CounterHandler{ - Skipper: skipper, - } - next := handler - handler = http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - if counter.Skip() { - writer.WriteHeader(http.StatusConflict) - return - } - next.ServeHTTP(writer, request) - }) - } - - err := http.ListenAndServe(":8080", handler) - if err != nil { - panic(err) - } + logging.FromContext(ctx).Info("Closing the recordevents process") } diff --git a/vendor/modules.txt b/vendor/modules.txt index 6f20123e5d..2fc681287c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1145,7 +1145,7 @@ k8s.io/utils/buffer k8s.io/utils/integer k8s.io/utils/pointer k8s.io/utils/trace -# knative.dev/eventing v0.17.5-0.20200925065343-049b8e743bd4 +# knative.dev/eventing v0.17.9-0.20201105153307-2fb113c42ff4 ## explicit knative.dev/eventing/pkg/adapter/v2 knative.dev/eventing/pkg/adapter/v2/test @@ -1255,6 +1255,9 @@ knative.dev/eventing/test/lib/dropevents/dropeventsfibonacci knative.dev/eventing/test/lib/dropevents/dropeventsfirst knative.dev/eventing/test/lib/duck knative.dev/eventing/test/lib/recordevents +knative.dev/eventing/test/lib/recordevents/logger_vent +knative.dev/eventing/test/lib/recordevents/observer +knative.dev/eventing/test/lib/recordevents/recorder_vent knative.dev/eventing/test/lib/resources knative.dev/eventing/test/lib/sender knative.dev/eventing/test/performance/infra