diff --git a/go.mod b/go.mod index 0a444df3f9..0651ce2412 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/IBM/sarama v1.41.2 github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.3.1-0.20230918062809-28780a762825 - github.com/cloudevents/sdk-go/v2 v2.13.0 + github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/google/go-cmp v0.6.0 github.com/google/gofuzz v1.2.0 github.com/google/uuid v1.3.1 diff --git a/go.sum b/go.sum index 7111fc69b1..2d0662b195 100644 --- a/go.sum +++ b/go.sum @@ -129,8 +129,8 @@ github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.3.1-0.20230918062809-2 github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.3.1-0.20230918062809-28780a762825/go.mod h1:hh+AlY88cpaWzAJXrcTer/IEqOjZoHc/nQOYmnRZgqE= github.com/cloudevents/sdk-go/sql/v2 v2.13.0 h1:gMJvQ3XFkygY9JmrusgK80d9yRAb8+J3X8IA1OC+oc0= github.com/cloudevents/sdk-go/sql/v2 v2.13.0/go.mod h1:XZRQBCgRreddIpQrdjBJQUrRg3BCs3aikplJQkHrK44= -github.com/cloudevents/sdk-go/v2 v2.13.0 h1:2zxDS8RyY1/wVPULGGbdgniGXSzLaRJVl136fLXGsYw= -github.com/cloudevents/sdk-go/v2 v2.13.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To= +github.com/cloudevents/sdk-go/v2 v2.15.2 h1:54+I5xQEnI73RBhWHxbI1XJcqOFOVJN85vb41+8mHUc= +github.com/cloudevents/sdk-go/v2 v2.15.2/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= diff --git a/vendor/github.com/cloudevents/sdk-go/v2/alias.go b/vendor/github.com/cloudevents/sdk-go/v2/alias.go index ed64b4c0cc..2fbfaa9a78 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/alias.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/alias.go @@ -135,8 +135,14 @@ var ( ToMessage = binding.ToMessage // Event Creation - NewEventFromHTTPRequest = http.NewEventFromHTTPRequest - NewEventFromHTTPResponse = http.NewEventFromHTTPResponse + + NewEventFromHTTPRequest = http.NewEventFromHTTPRequest + NewEventFromHTTPResponse = http.NewEventFromHTTPResponse + NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest + NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse + NewHTTPRequestFromEvent = http.NewHTTPRequestFromEvent + NewHTTPRequestFromEvents = http.NewHTTPRequestFromEvents + IsHTTPBatch = http.IsHTTPBatch // HTTP Messages diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/doc.go index 8fa999789f..ff92f68368 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/doc.go @@ -4,7 +4,6 @@ */ /* - Package binding defines interfaces for protocol bindings. NOTE: Most applications that emit or consume events should use the ../client @@ -16,11 +15,11 @@ Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events. -Protocol Bindings +# Protocol Bindings A protocol binding usually implements a Message, a Sender and Receiver, a StructuredWriter and a BinaryWriter (depending on the supported encodings of the protocol) and an Write[ProtocolMessage] method. -Read and write events +# Read and write events The core of this package is the binding.Message interface. Through binding.MessageReader It defines how to read a protocol specific message for an @@ -49,7 +48,7 @@ The binding.Write method tries to preserve the structured/binary encoding, in or Messages can be eventually wrapped to change their behaviours and binding their lifecycle, like the binding.FinishMessage. Every Message wrapper implements the MessageWrapper interface -Sender and Receiver +# Sender and Receiver A Receiver receives protocol specific messages and wraps them to into binding.Message implementations. @@ -60,9 +59,8 @@ Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported. -Transport +# Transport A binding implementation providing Sender and Receiver implementations can be used as a Transport through the BindingTransport adapter. - */ package binding diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go index 16611a3d75..bb8f914248 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go @@ -11,14 +11,17 @@ import "errors" type Encoding int const ( - // Binary encoding as specified in https://github.com/cloudevents/spec/blob/master/spec.md#message + // Binary encoding as specified in https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message EncodingBinary Encoding = iota - // Structured encoding as specified in https://github.com/cloudevents/spec/blob/master/spec.md#message + // Structured encoding as specified in https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message EncodingStructured // Message is an instance of EventMessage or it contains EventMessage nested (through MessageWrapper) EncodingEvent // When the encoding is unknown (which means that the message is a non-event) EncodingUnknown + + // EncodingBatch is an instance of JSON Batched Events + EncodingBatch ) func (e Encoding) String() string { @@ -29,6 +32,8 @@ func (e Encoding) String() string { return "structured" case EncodingEvent: return "event" + case EncodingBatch: + return "batch" case EncodingUnknown: return "unknown" } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/event_message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/event_message.go index f82c729c44..83d613af41 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/event_message.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/event_message.go @@ -22,7 +22,9 @@ const ( // EventMessage type-converts a event.Event object to implement Message. // This allows local event.Event objects to be sent directly via Sender.Send() -// s.Send(ctx, binding.EventMessage(e)) +// +// s.Send(ctx, binding.EventMessage(e)) +// // When an event is wrapped into a EventMessage, the original event could be // potentially mutated. If you need to use the Event again, after wrapping it into // an Event message, you should copy it before diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/format/format.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/format/format.go index 2d840025ea..6bdd1842b7 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/format/format.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/format/format.go @@ -7,6 +7,7 @@ package format import ( "encoding/json" + "errors" "fmt" "strings" @@ -41,12 +42,33 @@ func (jsonFmt) Unmarshal(b []byte, e *event.Event) error { return json.Unmarshal(b, e) } +// JSONBatch is the built-in "application/cloudevents-batch+json" format. +var JSONBatch = jsonBatchFmt{} + +type jsonBatchFmt struct{} + +func (jb jsonBatchFmt) MediaType() string { + return event.ApplicationCloudEventsBatchJSON +} + +// Marshal will return an error for jsonBatchFmt since the Format interface doesn't support batch Marshalling, and we +// know it's structured batch json, we'll go direct to the json.UnMarshall() (see `ToEvents()`) since that is the best +// way to support batch operations for now. +func (jb jsonBatchFmt) Marshal(e *event.Event) ([]byte, error) { + return nil, errors.New("not supported for batch events") +} + +func (jb jsonBatchFmt) Unmarshal(b []byte, e *event.Event) error { + return errors.New("not supported for batch events") +} + // built-in formats var formats map[string]Format func init() { formats = map[string]Format{} Add(JSON) + Add(JSONBatch) } // Lookup returns the format for contentType, or nil if not found. diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go index e30e150c02..2fb136c62d 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go @@ -66,7 +66,7 @@ type MessageMetadataReader interface { // Message is the interface to a binding-specific message containing an event. // -// Reliable Delivery +// # Reliable Delivery // // There are 3 reliable qualities of service for messages: // diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/spec/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/spec/doc.go index 44c0b3145b..da5bc9f854 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/spec/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/spec/doc.go @@ -8,6 +8,5 @@ Package spec provides spec-version metadata. For use by code that maps events using (prefixed) attribute name strings. Supports handling multiple spec versions uniformly. - */ package spec diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/to_event.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/to_event.go index 339a7833c3..d3332c1580 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/to_event.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/to_event.go @@ -8,6 +8,7 @@ package binding import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -21,6 +22,9 @@ import ( // ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails var ErrCannotConvertToEvent = errors.New("cannot convert message to event") +// ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails +var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events") + // ToEvent translates a Message with a valid Structured or Binary representation to an Event. // This function returns the Event generated from the Message and the original encoding of the message or // an error that points the conversion error. @@ -61,6 +65,21 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder) } +// ToEvents translates a Batch Message and corresponding Reader data to a slice of Events. +// This function returns the Events generated from the body data, or an error that points +// to the conversion issue. +func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error) { + messageEncoding := message.ReadEncoding() + if messageEncoding != EncodingBatch { + return nil, ErrCannotConvertToEvents + } + + // Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the + // json.UnMarshall(), since that is the best way to support batch operations for now. + var events []event.Event + return events, json.NewDecoder(body).Decode(&events) +} + type messageToEventBuilder event.Event var _ StructuredWriter = (*messageToEventBuilder)(nil) diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go index ea8fbfbb4d..452304ffdf 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go @@ -98,6 +98,7 @@ type ceClient struct { eventDefaulterFns []EventDefaulter pollGoroutines int blockingCallback bool + ackMalformedEvent bool } func (c *ceClient) applyOptions(opts ...Option) error { @@ -202,7 +203,13 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { return fmt.Errorf("client already has a receiver") } - invoker, err := newReceiveInvoker(fn, c.observabilityService, c.inboundContextDecorators, c.eventDefaulterFns...) + invoker, err := newReceiveInvoker( + fn, + c.observabilityService, + c.inboundContextDecorators, + c.eventDefaulterFns, + c.ackMalformedEvent, + ) if err != nil { return err } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go b/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go index 94a4b4e65e..672581b580 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go @@ -14,7 +14,7 @@ import ( ) func NewHTTPReceiveHandler(ctx context.Context, p *thttp.Protocol, fn interface{}) (*EventReceiver, error) { - invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil) //TODO(slinkydeveloper) maybe not nil? + invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil, nil, false) //TODO(slinkydeveloper) maybe not nil? if err != nil { return nil, err } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go index 403fb0f559..a3080b007a 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go @@ -23,11 +23,18 @@ type Invoker interface { var _ Invoker = (*receiveInvoker)(nil) -func newReceiveInvoker(fn interface{}, observabilityService ObservabilityService, inboundContextDecorators []func(context.Context, binding.Message) context.Context, fns ...EventDefaulter) (Invoker, error) { +func newReceiveInvoker( + fn interface{}, + observabilityService ObservabilityService, + inboundContextDecorators []func(context.Context, binding.Message) context.Context, + fns []EventDefaulter, + ackMalformedEvent bool, +) (Invoker, error) { r := &receiveInvoker{ eventDefaulterFns: fns, observabilityService: observabilityService, inboundContextDecorators: inboundContextDecorators, + ackMalformedEvent: ackMalformedEvent, } if fn, err := receiver(fn); err != nil { @@ -44,6 +51,7 @@ type receiveInvoker struct { observabilityService ObservabilityService eventDefaulterFns []EventDefaulter inboundContextDecorators []func(context.Context, binding.Message) context.Context + ackMalformedEvent bool } func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn protocol.ResponseFn) (err error) { @@ -58,13 +66,13 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p switch { case eventErr != nil && r.fn.hasEventIn: r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr) - return respFn(ctx, nil, protocol.NewReceipt(false, "failed to convert Message to Event: %w", eventErr)) + return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr)) case r.fn != nil: // Check if event is valid before invoking the receiver function if e != nil { if validationErr := e.Validate(); validationErr != nil { r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr) - return respFn(ctx, nil, protocol.NewReceipt(false, "validation error in incoming event: %w", validationErr)) + return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr)) } } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go index 938478162b..44394be349 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go @@ -126,3 +126,16 @@ func WithBlockingCallback() Option { return nil } } + +// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged +// rather than being permanently not-acknowledged. This can be useful when a protocol does not +// provide a responder implementation and would otherwise cause the receiver to be partially or +// fully stuck. +func WithAckMalformedEvent() Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.ackMalformedEvent = true + } + return nil + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go b/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go index b1ab532d79..2cc0e64974 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go @@ -57,7 +57,6 @@ var ( // * func(event.Event) (*event.Event, protocol.Result) // * func(context.Context, event.Event) *event.Event // * func(context.Context, event.Event) (*event.Event, protocol.Result) -// func receiver(fn interface{}) (*receiverFn, error) { fnType := reflect.TypeOf(fn) if fnType.Kind() != reflect.Func { diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/event.go b/vendor/github.com/cloudevents/sdk-go/v2/event/event.go index 94b5aa0ada..52495f9a39 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/event.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/event.go @@ -55,13 +55,12 @@ func New(version ...string) Event { // Use functions in the types package to convert extension values. // For example replace this: // -// var i int -// err := e.ExtensionAs("foo", &i) +// var i int +// err := e.ExtensionAs("foo", &i) // // With this: // -// i, err := types.ToInteger(e.Extensions["foo"]) -// +// i, err := types.ToInteger(e.Extensions["foo"]) func (e Event) ExtensionAs(name string, obj interface{}) error { return e.Context.ExtensionAs(name, obj) } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go index c511c81c45..3f0505547c 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go @@ -179,7 +179,8 @@ func (ec EventContextV03) AsV1() *EventContextV1 { } // Validate returns errors based on requirements from the CloudEvents spec. -// For more details, see https://github.com/cloudevents/spec/blob/master/spec.md +// For more details, see +// https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md // As of Feb 26, 2019, commit 17c32ea26baf7714ad027d9917d03d2fff79fc7e // + https://github.com/cloudevents/spec/pull/387 -> datacontentencoding // + https://github.com/cloudevents/spec/pull/406 -> subject diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/extensions.go b/vendor/github.com/cloudevents/sdk-go/v2/event/extensions.go index 6c4193f348..72d0e757aa 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/extensions.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/extensions.go @@ -50,7 +50,7 @@ func validateExtensionName(key string) error { for _, c := range key { if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9')) { - return errors.New("bad key, CloudEvents attribute names MUST consist of lower-case letters ('a' to 'z') or digits ('0' to '9') from the ASCII character set") + return errors.New("bad key, CloudEvents attribute names MUST consist of lower-case letters ('a' to 'z'), upper-case letters ('A' to 'Z') or digits ('0' to '9') from the ASCII character set") } } return nil diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/doc.go index f826a1841d..3c771fc5c4 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/doc.go @@ -21,6 +21,5 @@ Available protocols: * Nats * Nats Streaming (stan) * Google PubSub - */ package protocol diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go index 0eec396a1e..e973738c6c 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go @@ -24,7 +24,7 @@ type RequestData struct { } // WithRequestDataAtContext uses the http.Request to add RequestData -// information to the Context. +// information to the Context. func WithRequestDataAtContext(ctx context.Context, r *nethttp.Request) context.Context { if r == nil { return ctx diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/message.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/message.go index e7e51d034b..7a7c36f9b1 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/message.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/message.go @@ -92,6 +92,9 @@ func (m *Message) ReadEncoding() binding.Encoding { return binding.EncodingBinary } if m.format != nil { + if m.format == format.JSONBatch { + return binding.EncodingBatch + } return binding.EncodingStructured } return binding.EncodingUnknown diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go index 5e400905a7..6582af3eaf 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go @@ -158,7 +158,6 @@ func WithMethod(method string) Option { } } -// // Middleware is a function that takes an existing http.Handler and wraps it in middleware, // returning the wrapped http.Handler. type Middleware func(next nethttp.Handler) nethttp.Handler diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go index dba6fd7baa..7ee3b8fe12 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go @@ -102,7 +102,10 @@ func New(opts ...Option) (*Protocol, error) { } if p.Client == nil { - p.Client = http.DefaultClient + // This is how http.DefaultClient is initialized. We do not just use + // that because when WithRoundTripper is used, it will change the client's + // transport, which would cause that transport to be used process-wide. + p.Client = &http.Client{} } if p.roundTripper != nil { diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go index 71e7346f30..21fc7e9b3b 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go @@ -10,9 +10,7 @@ import ( "context" "errors" "io" - "io/ioutil" "net/http" - "net/url" "time" "go.uber.org/zap" @@ -52,7 +50,7 @@ func (p *Protocol) doOnce(req *http.Request) (binding.Message, protocol.Result) } func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParams, req *http.Request) (binding.Message, error) { - then := time.Now() + start := time.Now() retry := 0 results := make([]protocol.Result, 0) @@ -67,7 +65,7 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam cecontext.LoggerFrom(ctx).Warnw("could not close request body", zap.Error(err)) } }() - body, err = ioutil.ReadAll(req.Body) + body, err = io.ReadAll(req.Body) if err != nil { panic(err) } @@ -79,51 +77,34 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam // Fast track common case. if protocol.IsACK(result) { - return msg, NewRetriesResult(result, retry, then, results) + return msg, NewRetriesResult(result, retry, start, results) } - // Try again? - // - // Make sure the error was something we should retry. - - { - var uErr *url.Error - if errors.As(result, &uErr) { - goto DoBackoff + var httpResult *Result + if errors.As(result, &httpResult) { + sc := httpResult.StatusCode + if !p.isRetriableFunc(sc) { + cecontext.LoggerFrom(ctx).Debugw("status code not retryable, will not try again", + zap.Error(httpResult), + zap.Int("statusCode", sc)) + return msg, NewRetriesResult(result, retry, start, results) } } - { - var httpResult *Result - if errors.As(result, &httpResult) { - sc := httpResult.StatusCode - if p.isRetriableFunc(sc) { - // retry! - goto DoBackoff - } else { - // Permanent error - cecontext.LoggerFrom(ctx).Debugw("status code not retryable, will not try again", - zap.Error(httpResult), - zap.Int("statusCode", sc)) - return msg, NewRetriesResult(result, retry, then, results) - } - } - } - - DoBackoff: - resetBody(req, body) - - // Wait for the correct amount of backoff time. - // total tries = retry + 1 - if err := params.Backoff(ctx, retry+1); err != nil { + if err = params.Backoff(ctx, retry+1); err != nil { // do not try again. cecontext.LoggerFrom(ctx).Debugw("backoff error, will not try again", zap.Error(err)) - return msg, NewRetriesResult(result, retry, then, results) + return msg, NewRetriesResult(result, retry, start, results) } retry++ + resetBody(req, body) results = append(results, result) + if msg != nil { + // avoid leak, forget message, ignore error + _ = msg.Finish(nil) + } } } @@ -134,12 +115,12 @@ func resetBody(req *http.Request, body []byte) { return } - req.Body = ioutil.NopCloser(bytes.NewReader(body)) + req.Body = io.NopCloser(bytes.NewReader(body)) // do not modify existing GetBody function if req.GetBody == nil { req.GetBody = func() (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(body)), nil + return io.NopCloser(bytes.NewReader(body)), nil } } } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/utility.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/utility.go index d46a334612..350fc1cf61 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/utility.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/utility.go @@ -6,7 +6,9 @@ package http import ( + "bytes" "context" + "encoding/json" nethttp "net/http" "github.com/cloudevents/sdk-go/v2/binding" @@ -24,3 +26,64 @@ func NewEventFromHTTPResponse(resp *nethttp.Response) (*event.Event, error) { msg := NewMessageFromHttpResponse(resp) return binding.ToEvent(context.Background(), msg) } + +// NewEventsFromHTTPRequest returns a batched set of Events from a HTTP Request +func NewEventsFromHTTPRequest(req *nethttp.Request) ([]event.Event, error) { + msg := NewMessageFromHttpRequest(req) + return binding.ToEvents(context.Background(), msg, msg.BodyReader) +} + +// NewEventsFromHTTPResponse returns a batched set of Events from a HTTP Response +func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) { + msg := NewMessageFromHttpResponse(resp) + return binding.ToEvents(context.Background(), msg, msg.BodyReader) +} + +// NewHTTPRequestFromEvent creates a http.Request object that can be used with any http.Client for a singular event. +// This is an HTTP POST action to the provided url. +func NewHTTPRequestFromEvent(ctx context.Context, url string, event event.Event) (*nethttp.Request, error) { + if err := event.Validate(); err != nil { + return nil, err + } + + req, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, nil) + if err != nil { + return nil, err + } + if err := WriteRequest(ctx, (*binding.EventMessage)(&event), req); err != nil { + return nil, err + } + + return req, nil +} + +// NewHTTPRequestFromEvents creates a http.Request object that can be used with any http.Client for sending +// a batched set of events. This is an HTTP POST action to the provided url. +func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Event) (*nethttp.Request, error) { + // Sending batch events is quite straightforward, as there is only JSON format, so a simple implementation. + for _, e := range events { + if err := e.Validate(); err != nil { + return nil, err + } + } + var buffer bytes.Buffer + err := json.NewEncoder(&buffer).Encode(events) + if err != nil { + return nil, err + } + + request, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, &buffer) + if err != nil { + return nil, err + } + + request.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + + return request, nil +} + +// IsHTTPBatch returns if the current http.Request or http.Response is a batch event operation, by checking the +// header `Content-Type` value. +func IsHTTPBatch(header nethttp.Header) bool { + return header.Get(ContentType) == event.ApplicationCloudEventsBatchJSON +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/write_request.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/write_request.go index 43ad36180c..f22259a3af 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/write_request.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/write_request.go @@ -9,7 +9,6 @@ import ( "bytes" "context" "io" - "io/ioutil" "net/http" "strings" @@ -58,7 +57,7 @@ func (b *httpRequestWriter) SetData(data io.Reader) error { func (b *httpRequestWriter) setBody(body io.Reader) error { rc, ok := body.(io.ReadCloser) if !ok && body != nil { - rc = ioutil.NopCloser(body) + rc = io.NopCloser(body) } b.Body = rc if body != nil { @@ -68,21 +67,21 @@ func (b *httpRequestWriter) setBody(body io.Reader) error { buf := v.Bytes() b.GetBody = func() (io.ReadCloser, error) { r := bytes.NewReader(buf) - return ioutil.NopCloser(r), nil + return io.NopCloser(r), nil } case *bytes.Reader: b.ContentLength = int64(v.Len()) snapshot := *v b.GetBody = func() (io.ReadCloser, error) { r := snapshot - return ioutil.NopCloser(&r), nil + return io.NopCloser(&r), nil } case *strings.Reader: b.ContentLength = int64(v.Len()) snapshot := *v b.GetBody = func() (io.ReadCloser, error) { r := snapshot - return ioutil.NopCloser(&r), nil + return io.NopCloser(&r), nil } default: // This is where we'd set it to -1 (at least @@ -137,5 +136,7 @@ func (b *httpRequestWriter) SetExtension(name string, value interface{}) error { return nil } -var _ binding.StructuredWriter = (*httpRequestWriter)(nil) // Test it conforms to the interface -var _ binding.BinaryWriter = (*httpRequestWriter)(nil) // Test it conforms to the interface +var ( + _ binding.StructuredWriter = (*httpRequestWriter)(nil) // Test it conforms to the interface + _ binding.BinaryWriter = (*httpRequestWriter)(nil) // Test it conforms to the interface +) diff --git a/vendor/github.com/cloudevents/sdk-go/v2/test/event_matchers.go b/vendor/github.com/cloudevents/sdk-go/v2/test/event_matchers.go index 712fc7b0b6..a849cee183 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/test/event_matchers.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/test/event_matchers.go @@ -184,6 +184,18 @@ func HasExtensions(ext map[string]interface{}) EventMatcher { } } +// HasExtensionKeys checks if the event contains the provided keys from its extensions +func HasExtensionKeys(keys []string) EventMatcher { + return func(have event.Event) error { + for _, k := range keys { + if _, ok := have.Extensions()[k]; !ok { + return fmt.Errorf("expecting extension key %q", k) + } + } + return nil + } +} + // HasExtension checks if the event contains the provided extension func HasExtension(key string, value interface{}) EventMatcher { return HasExtensions(map[string]interface{}{key: value}) @@ -277,7 +289,6 @@ func HasAttributeKind(kind spec.Kind, value interface{}) EventMatcher { // LICENSE: MIT License func isEmpty(object interface{}) bool { - // get nil case out of the way if object == nil { return true diff --git a/vendor/github.com/cloudevents/sdk-go/v2/types/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/types/doc.go index cf7a94f35c..3a0a595a1d 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/types/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/types/doc.go @@ -11,25 +11,25 @@ type has a corresponding native Go type and a canonical string encoding. The native Go types used to represent the CloudEvents types are: bool, int32, string, []byte, *url.URL, time.Time - +----------------+----------------+-----------------------------------+ - |CloudEvents Type|Native Type |Convertible From | - +================+================+===================================+ - |Bool |bool |bool | - +----------------+----------------+-----------------------------------+ - |Integer |int32 |Any numeric type with value in | - | | |range of int32 | - +----------------+----------------+-----------------------------------+ - |String |string |string | - +----------------+----------------+-----------------------------------+ - |Binary |[]byte |[]byte | - +----------------+----------------+-----------------------------------+ - |URI-Reference |*url.URL |url.URL, types.URIRef, types.URI | - +----------------+----------------+-----------------------------------+ - |URI |*url.URL |url.URL, types.URIRef, types.URI | - | | |Must be an absolute URI. | - +----------------+----------------+-----------------------------------+ - |Timestamp |time.Time |time.Time, types.Timestamp | - +----------------+----------------+-----------------------------------+ + +----------------+----------------+-----------------------------------+ + |CloudEvents Type|Native Type |Convertible From | + +================+================+===================================+ + |Bool |bool |bool | + +----------------+----------------+-----------------------------------+ + |Integer |int32 |Any numeric type with value in | + | | |range of int32 | + +----------------+----------------+-----------------------------------+ + |String |string |string | + +----------------+----------------+-----------------------------------+ + |Binary |[]byte |[]byte | + +----------------+----------------+-----------------------------------+ + |URI-Reference |*url.URL |url.URL, types.URIRef, types.URI | + +----------------+----------------+-----------------------------------+ + |URI |*url.URL |url.URL, types.URIRef, types.URI | + | | |Must be an absolute URI. | + +----------------+----------------+-----------------------------------+ + |Timestamp |time.Time |time.Time, types.Timestamp | + +----------------+----------------+-----------------------------------+ Extension attributes may be stored as a native type or a canonical string. The To functions will convert to the desired from any convertible type @@ -41,6 +41,5 @@ canonical strings. Note are no Parse or Format functions for URL or string. For URL use the standard url.Parse() and url.URL.String(). The canonical string format of a string is the string itself. - */ package types diff --git a/vendor/github.com/cloudevents/sdk-go/v2/types/value.go b/vendor/github.com/cloudevents/sdk-go/v2/types/value.go index f643d0aa51..14004d3e18 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/types/value.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/types/value.go @@ -86,7 +86,7 @@ func Format(v interface{}) (string, error) { } // Validate v is a valid CloudEvents attribute value, convert it to one of: -// bool, int32, string, []byte, types.URI, types.URIRef, types.Timestamp +// bool, int32, string, []byte, types.URI, types.URIRef, types.Timestamp func Validate(v interface{}) (interface{}, error) { switch v := v.(type) { case bool, int32, string, []byte: @@ -151,7 +151,9 @@ func Validate(v interface{}) (interface{}, error) { } // Clone v clones a CloudEvents attribute value, which is one of the valid types: -// bool, int32, string, []byte, types.URI, types.URIRef, types.Timestamp +// +// bool, int32, string, []byte, types.URI, types.URIRef, types.Timestamp +// // Returns the same type // Panics if the type is not valid func Clone(v interface{}) interface{} { diff --git a/vendor/modules.txt b/vendor/modules.txt index de7275d034..ea09afb8d6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -79,8 +79,8 @@ github.com/cloudevents/sdk-go/sql/v2/gen github.com/cloudevents/sdk-go/sql/v2/parser github.com/cloudevents/sdk-go/sql/v2/runtime github.com/cloudevents/sdk-go/sql/v2/utils -# github.com/cloudevents/sdk-go/v2 v2.13.0 -## explicit; go 1.17 +# github.com/cloudevents/sdk-go/v2 v2.15.2 +## explicit; go 1.18 github.com/cloudevents/sdk-go/v2 github.com/cloudevents/sdk-go/v2/binding github.com/cloudevents/sdk-go/v2/binding/format