diff --git a/pkg/binding/doc.go b/pkg/binding/doc.go new file mode 100644 index 000000000..5aa15944b --- /dev/null +++ b/pkg/binding/doc.go @@ -0,0 +1,25 @@ +/* + +Package binding is for implementing transport bindings and +intermediaries like importers, brokers or channels that forward +messages between bindings. + +A transport binding implements Message, Sender and Receiver interfaces. +An intermediary uses those interfaces to transfer event messages. + +A Message is an abstract container for an Event. It provides +additional methods for efficient forwarding of structured events +and reliable delivery when the underlying transports support it. + +A Receiver can return instances of its own Message implementations. A +Sender must be able to send any implementation of the Message +interface, but it may provide optimized handling for its own Message +implementations. + +For transports that support a reliable delivery QoS, the Message +interface allows acknowledgment between sender and receiver for QoS +level 0, 1 or 2. The effective QoS is the lowest provided by sender or +receiver. + +*/ +package binding diff --git a/pkg/cloudevents/transport/x/example_implementing_test.go b/pkg/binding/example_implementing_test.go similarity index 76% rename from pkg/cloudevents/transport/x/example_implementing_test.go rename to pkg/binding/example_implementing_test.go index 0e13e35da..2511db242 100644 --- a/pkg/cloudevents/transport/x/example_implementing_test.go +++ b/pkg/binding/example_implementing_test.go @@ -1,13 +1,13 @@ -package x_test +package binding_test import ( "context" "encoding/json" "io" + "github.com/cloudevents/sdk-go/pkg/binding" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" - "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/x" ) // ExMessage is a json.RawMessage, which is just a byte slice @@ -30,22 +30,21 @@ type ExSender struct{ *json.Encoder } func NewExSender(w io.Writer) ExSender { return ExSender{json.NewEncoder(w)} } -func (s ExSender) Send(ctx context.Context, m x.Message) error { +func (s ExSender) Send(ctx context.Context, m binding.Message) error { if t, b := m.Structured(); t != "" { // Fast case: if the Message is already structured JSON we can // send it directly, no need to decode and re-encode. Encoding a // json.RawMessage to a json.Encoder() is just a byte-buffer copy. return s.Encode(json.RawMessage(b)) - } else { - // Some other message encoding. Decode as a generic cloudevents.Event - // and then re-encode as JSON - if e, err := m.Event(); err != nil { - return err - } else if err := s.Encode(e); err != nil { - return err - } - return nil } + // Some other message encoding. Decode as a generic cloudevents.Event + // and then re-encode as JSON + if e, err := m.Event(); err != nil { + return err + } else if err := s.Encode(e); err != nil { + return err + } + return nil } // ExReceiver receives by reading JSON encoded events from an io.Reader @@ -53,7 +52,7 @@ type ExReceiver struct{ *json.Decoder } func NewExReceiver(r io.Reader) ExReceiver { return ExReceiver{json.NewDecoder(r)} } -func (sr ExReceiver) Receive(context.Context) (x.Message, error) { +func (sr ExReceiver) Receive(context.Context) (binding.Message, error) { var m ExMessage err := sr.Decode(&m) // This is just a byte copy since m is a json.RawMessage return m, err @@ -62,7 +61,7 @@ func (sr ExReceiver) Receive(context.Context) (x.Message, error) { // NewExTransport returns a transport.Transport which is implemented by // an ExSender and an ExReceiver func NewExTransport(r io.Reader, w io.Writer) transport.Transport { - return x.NewTransport(NewExSender(w), NewExReceiver(r)) + return binding.NewTransport(NewExSender(w), NewExReceiver(r)) } // Example of implementing a transport including a simple message type, diff --git a/pkg/cloudevents/transport/x/example_using_test.go b/pkg/binding/example_using_test.go similarity index 93% rename from pkg/cloudevents/transport/x/example_using_test.go rename to pkg/binding/example_using_test.go index 5d829247c..60685602b 100644 --- a/pkg/cloudevents/transport/x/example_using_test.go +++ b/pkg/binding/example_using_test.go @@ -1,4 +1,4 @@ -package x_test +package binding_test import ( "context" @@ -10,7 +10,7 @@ import ( "github.com/cloudevents/sdk-go/pkg/cloudevents/client" ) -var count = 3 // Example ends after this many events. +const count = 3 // Example ends after this many events. // The sender uses the cloudevents.Client API, not the transport APIs directly. func runSender(w io.Writer) error { @@ -44,11 +44,11 @@ func runReceiver(r io.Reader) error { } return nil } - if c, err := client.New(NewExTransport(r, nil)); err != nil { + c, err := client.New(NewExTransport(r, nil)) + if err != nil { return err - } else { - return c.StartReceiver(context.TODO(), process) } + return c.StartReceiver(context.TODO(), process) } // The intermediary receives events and forwards them to another @@ -61,6 +61,7 @@ func runReceiver(r io.Reader) error { // reliable delivery. // func runIntermediary(r io.Reader, w io.WriteCloser) error { + defer w.Close() for { receiver := NewExReceiver(r) sender := NewExSender(w) diff --git a/pkg/cloudevents/transport/x/message.go b/pkg/binding/message.go similarity index 64% rename from pkg/cloudevents/transport/x/message.go rename to pkg/binding/message.go index 146cad356..2dd36f0b9 100644 --- a/pkg/cloudevents/transport/x/message.go +++ b/pkg/binding/message.go @@ -1,10 +1,9 @@ -package x +package binding import ( "context" "github.com/cloudevents/sdk-go/pkg/cloudevents" - "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" ) // Message is the interface to a transport-specific message containing an event. @@ -71,62 +70,23 @@ type ExactlyOnceMessage interface { // EventMessage wraps a local cloudevents.Event as a Message. type EventMessage cloudevents.Event +// Event returns the event. func (m EventMessage) Event() (cloudevents.Event, error) { return cloudevents.Event(m), nil } -func (_ EventMessage) Structured() (string, []byte) { return "", nil } -func (_ EventMessage) Finish(error) {} + +// Structured returns ("", nil). +func (EventMessage) Structured() (string, []byte) { return "", nil } + +// Finish does nothing. +func (EventMessage) Finish(error) {} var _ Message = EventMessage{} // Test it conforms to the interface -// Receiver is the receiving half of a transport. +// Receiver receives messages. type Receiver interface { Receive(ctx context.Context) (Message, error) } -// Sender is the sending half of a transport. +// Sender sends messages. type Sender interface { Send(ctx context.Context, m Message) error } - -// Transport implements the transport.Transport interface using a -// Sender and Receiver. -type Transport struct { - Sender Sender - Receiver Receiver - handler transport.Receiver -} - -var _ transport.Transport = (*Transport)(nil) // Test it conforms to the interface - -func NewTransport(s Sender, r Receiver) *Transport { - return &Transport{Sender: s, Receiver: r} -} - -func (t *Transport) Send(ctx context.Context, e cloudevents.Event) (context.Context, *cloudevents.Event, error) { - return ctx, nil, t.Sender.Send(ctx, EventMessage(e)) -} - -func (t *Transport) SetReceiver(r transport.Receiver) { t.handler = r } - -func (t *Transport) StartReceiver(ctx context.Context) error { - for { - if m, err := t.Receiver.Receive(ctx); err != nil { - return err - } else if e, err := m.Event(); err != nil { - m.Finish(err) - return err - } else if err := t.handler.Receive(ctx, e, nil); err != nil { - m.Finish(err) - return err - } else { - m.Finish(nil) - } - } -} - -func (t *Transport) SetConverter(transport.Converter) { - // TODO(alanconway) Can we separate Converter from the base transport interface? -} - -func (t *Transport) HasConverter() bool { - return false -} diff --git a/pkg/binding/transport.go b/pkg/binding/transport.go new file mode 100644 index 000000000..81c239bf8 --- /dev/null +++ b/pkg/binding/transport.go @@ -0,0 +1,52 @@ +package binding + +import ( + "context" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" +) + +// Transport implements the transport.Transport interface using a +// Sender and Receiver. +type Transport struct { + Sender Sender + Receiver Receiver + handler transport.Receiver +} + +var _ transport.Transport = (*Transport)(nil) // Test it conforms to the interface + +func NewTransport(s Sender, r Receiver) *Transport { + return &Transport{Sender: s, Receiver: r} +} + +func (t *Transport) Send(ctx context.Context, e cloudevents.Event) (context.Context, *cloudevents.Event, error) { + return ctx, nil, t.Sender.Send(ctx, EventMessage(e)) +} + +func (t *Transport) SetReceiver(r transport.Receiver) { t.handler = r } + +func (t *Transport) StartReceiver(ctx context.Context) error { + for { + if m, err := t.Receiver.Receive(ctx); err != nil { + return err + } else if e, err := m.Event(); err != nil { + m.Finish(err) + return err + } else if err := t.handler.Receive(ctx, e, nil); err != nil { + m.Finish(err) + return err + } else { + m.Finish(nil) + } + } +} + +func (t *Transport) SetConverter(transport.Converter) { + // TODO(alanconway) Can we separate Converter from the base transport interface? +} + +func (t *Transport) HasConverter() bool { + return false +} diff --git a/pkg/cloudevents/transport/x/doc.go b/pkg/cloudevents/transport/x/doc.go deleted file mode 100644 index 2475d64b1..000000000 --- a/pkg/cloudevents/transport/x/doc.go +++ /dev/null @@ -1,49 +0,0 @@ -/* - -Package transport/x shows proposed modifications to package transport. - -They are here for discussion purposes, anything that we can agree on -should be merged into the transport package and may involve -incompatible changes. - -Goals - -Support for intermediaries that receive and send events via different -transports, for example adapters, importers, brokers or channels. - -Support for reliable messaging qualities of service between different -transports - -Provide the option of blocking Receive vs. callback SetReceiver. -There are many situations where the blocking style is easier to -implement and work with. - -Overview of changes - -Sender/Receiver interfaces that can be implemented and used separately. -Implementations can easily be combined into a single Transport that does both -(e.g. for use by the client package) - -Message.Event() allows conversion from any message to the common -in-memory Event representation, without knowing its transport/codec of -origin. Explict use of Codecs is not required. (Transports should -still provide a way for users to build/interpret transport messages if they -want to bypass our transport APIs, but that's a separate issue.) - -Message.Structured() optionally allows a structured encoding to be -forwarded without decoding. This is a special case, but an important -one. - -Reliable messaging: Transport implementation must provide the actual -reliability (resends, storing delivery state etc.) the Message -interface allows hand-over responsibility between a Receiver and a -Sender on different transports so we don't create a "hole" in the -guarantee. The overall guarantee is the weaker of the sender and -receiver. - -No reliability examples yet, wanted to get some feedback before going further. -*/ -package x - -// FIXME(alanconway) review - check for "2" suffix -// Finish comments.