Skip to content

Commit

Permalink
Renamed pkg/cloudevents/transport/x to pkg/binding (#181)
Browse files Browse the repository at this point in the history
Main types are binding.Message, binding.Sender and binding.Receiver which
reads well in code, corresponds to CE spec use of the term "binding", and
avoids clashes with existing package name "transport".

Part of #180 - The AMQP transport will be re-factored to use this package.

Signed-off-by: Alan Conway <aconway@redhat.com>
  • Loading branch information
alanconway authored and n3wscott committed Sep 9, 2019
1 parent a1c7a75 commit d3eb0a1
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 118 deletions.
25 changes: 25 additions & 0 deletions pkg/binding/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Package binding is for implementing transport bindings and
intermediaries like importers, brokers or channels that forward
messages between bindings.
A transport binding implements Message, Sender and Receiver interfaces.
An intermediary uses those interfaces to transfer event messages.
A Message is an abstract container for an Event. It provides
additional methods for efficient forwarding of structured events
and reliable delivery when the underlying transports support it.
A Receiver can return instances of its own Message implementations. A
Sender must be able to send any implementation of the Message
interface, but it may provide optimized handling for its own Message
implementations.
For transports that support a reliable delivery QoS, the Message
interface allows acknowledgment between sender and receiver for QoS
level 0, 1 or 2. The effective QoS is the lowest provided by sender or
receiver.
*/
package binding
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package x_test
package binding_test

import (
"context"
"encoding/json"
"io"

"github.com/cloudevents/sdk-go/pkg/binding"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport/x"
)

// ExMessage is a json.RawMessage, which is just a byte slice
Expand All @@ -30,30 +30,29 @@ type ExSender struct{ *json.Encoder }

func NewExSender(w io.Writer) ExSender { return ExSender{json.NewEncoder(w)} }

func (s ExSender) Send(ctx context.Context, m x.Message) error {
func (s ExSender) Send(ctx context.Context, m binding.Message) error {
if t, b := m.Structured(); t != "" {
// Fast case: if the Message is already structured JSON we can
// send it directly, no need to decode and re-encode. Encoding a
// json.RawMessage to a json.Encoder() is just a byte-buffer copy.
return s.Encode(json.RawMessage(b))
} else {
// Some other message encoding. Decode as a generic cloudevents.Event
// and then re-encode as JSON
if e, err := m.Event(); err != nil {
return err
} else if err := s.Encode(e); err != nil {
return err
}
return nil
}
// Some other message encoding. Decode as a generic cloudevents.Event
// and then re-encode as JSON
if e, err := m.Event(); err != nil {
return err
} else if err := s.Encode(e); err != nil {
return err
}
return nil
}

// ExReceiver receives by reading JSON encoded events from an io.Reader
type ExReceiver struct{ *json.Decoder }

func NewExReceiver(r io.Reader) ExReceiver { return ExReceiver{json.NewDecoder(r)} }

func (sr ExReceiver) Receive(context.Context) (x.Message, error) {
func (sr ExReceiver) Receive(context.Context) (binding.Message, error) {
var m ExMessage
err := sr.Decode(&m) // This is just a byte copy since m is a json.RawMessage
return m, err
Expand All @@ -62,7 +61,7 @@ func (sr ExReceiver) Receive(context.Context) (x.Message, error) {
// NewExTransport returns a transport.Transport which is implemented by
// an ExSender and an ExReceiver
func NewExTransport(r io.Reader, w io.Writer) transport.Transport {
return x.NewTransport(NewExSender(w), NewExReceiver(r))
return binding.NewTransport(NewExSender(w), NewExReceiver(r))
}

// Example of implementing a transport including a simple message type,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package x_test
package binding_test

import (
"context"
Expand All @@ -10,7 +10,7 @@ import (
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
)

var count = 3 // Example ends after this many events.
const count = 3 // Example ends after this many events.

// The sender uses the cloudevents.Client API, not the transport APIs directly.
func runSender(w io.Writer) error {
Expand Down Expand Up @@ -44,11 +44,11 @@ func runReceiver(r io.Reader) error {
}
return nil
}
if c, err := client.New(NewExTransport(r, nil)); err != nil {
c, err := client.New(NewExTransport(r, nil))
if err != nil {
return err
} else {
return c.StartReceiver(context.TODO(), process)
}
return c.StartReceiver(context.TODO(), process)
}

// The intermediary receives events and forwards them to another
Expand All @@ -61,6 +61,7 @@ func runReceiver(r io.Reader) error {
// reliable delivery.
//
func runIntermediary(r io.Reader, w io.WriteCloser) error {
defer w.Close()
for {
receiver := NewExReceiver(r)
sender := NewExSender(w)
Expand Down
60 changes: 10 additions & 50 deletions pkg/cloudevents/transport/x/message.go → pkg/binding/message.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package x
package binding

import (
"context"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)

// Message is the interface to a transport-specific message containing an event.
Expand Down Expand Up @@ -71,62 +70,23 @@ type ExactlyOnceMessage interface {
// EventMessage wraps a local cloudevents.Event as a Message.
type EventMessage cloudevents.Event

// Event returns the event.
func (m EventMessage) Event() (cloudevents.Event, error) { return cloudevents.Event(m), nil }
func (_ EventMessage) Structured() (string, []byte) { return "", nil }
func (_ EventMessage) Finish(error) {}

// Structured returns ("", nil).
func (EventMessage) Structured() (string, []byte) { return "", nil }

// Finish does nothing.
func (EventMessage) Finish(error) {}

var _ Message = EventMessage{} // Test it conforms to the interface

// Receiver is the receiving half of a transport.
// Receiver receives messages.
type Receiver interface {
Receive(ctx context.Context) (Message, error)
}

// Sender is the sending half of a transport.
// Sender sends messages.
type Sender interface {
Send(ctx context.Context, m Message) error
}

// Transport implements the transport.Transport interface using a
// Sender and Receiver.
type Transport struct {
Sender Sender
Receiver Receiver
handler transport.Receiver
}

var _ transport.Transport = (*Transport)(nil) // Test it conforms to the interface

func NewTransport(s Sender, r Receiver) *Transport {
return &Transport{Sender: s, Receiver: r}
}

func (t *Transport) Send(ctx context.Context, e cloudevents.Event) (context.Context, *cloudevents.Event, error) {
return ctx, nil, t.Sender.Send(ctx, EventMessage(e))
}

func (t *Transport) SetReceiver(r transport.Receiver) { t.handler = r }

func (t *Transport) StartReceiver(ctx context.Context) error {
for {
if m, err := t.Receiver.Receive(ctx); err != nil {
return err
} else if e, err := m.Event(); err != nil {
m.Finish(err)
return err
} else if err := t.handler.Receive(ctx, e, nil); err != nil {
m.Finish(err)
return err
} else {
m.Finish(nil)
}
}
}

func (t *Transport) SetConverter(transport.Converter) {
// TODO(alanconway) Can we separate Converter from the base transport interface?
}

func (t *Transport) HasConverter() bool {
return false
}
52 changes: 52 additions & 0 deletions pkg/binding/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package binding

import (
"context"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)

// Transport implements the transport.Transport interface using a
// Sender and Receiver.
type Transport struct {
Sender Sender
Receiver Receiver
handler transport.Receiver
}

var _ transport.Transport = (*Transport)(nil) // Test it conforms to the interface

func NewTransport(s Sender, r Receiver) *Transport {
return &Transport{Sender: s, Receiver: r}
}

func (t *Transport) Send(ctx context.Context, e cloudevents.Event) (context.Context, *cloudevents.Event, error) {
return ctx, nil, t.Sender.Send(ctx, EventMessage(e))
}

func (t *Transport) SetReceiver(r transport.Receiver) { t.handler = r }

func (t *Transport) StartReceiver(ctx context.Context) error {
for {
if m, err := t.Receiver.Receive(ctx); err != nil {
return err
} else if e, err := m.Event(); err != nil {
m.Finish(err)
return err
} else if err := t.handler.Receive(ctx, e, nil); err != nil {
m.Finish(err)
return err
} else {
m.Finish(nil)
}
}
}

func (t *Transport) SetConverter(transport.Converter) {
// TODO(alanconway) Can we separate Converter from the base transport interface?
}

func (t *Transport) HasConverter() bool {
return false
}
49 changes: 0 additions & 49 deletions pkg/cloudevents/transport/x/doc.go

This file was deleted.

0 comments on commit d3eb0a1

Please sign in to comment.