Skip to content

Commit

Permalink
Add AMQP support. (#119)
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Nichols <nicholss@google.com>
  • Loading branch information
n3wscott authored and markpeek committed May 3, 2019
1 parent 1e664c4 commit 9396c7c
Show file tree
Hide file tree
Showing 20 changed files with 2,058 additions and 39 deletions.
68 changes: 68 additions & 0 deletions cmd/samples/amqp/receiver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport/amqp"
"github.com/kelseyhightower/envconfig"
qp "pack.ag/amqp"
)

type envConfig struct {
// AMQPServer URL to connect to the amqp server.
AMQPServer string `envconfig:"AMQP_SERVER" default:"amqp://localhost:5672/" required:"true"`

// Queue is the amqp queue name to interact with.
Queue string `envconfig:"AMQP_QUEUE"`

AccessKeyName string `envconfig:"AMQP_ACCESS_KEY_NAME" default:"guest"`
AccessKey string `envconfig:"AMQP_ACCESS_KEY" default:"password"`
}

func main() {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}
os.Exit(_main(os.Args[1:], env))
}

type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}

func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
fmt.Printf("Got CloudEvent,\n%+v\n", event)
fmt.Println("----------------------------")
return nil
}

func _main(args []string, env envConfig) int {
ctx := context.Background()

t, err := amqp.New(env.AMQPServer, env.Queue,
amqp.WithConnOpt(qp.ConnSASLPlain(env.AccessKeyName, env.AccessKey)),
)
if err != nil {
log.Fatalf("failed to create amqp transport, %s", err.Error())
}
c, err := client.New(t)
if err != nil {
log.Fatalf("failed to create client, %s", err.Error())
}

if err := c.StartReceiver(ctx, receive); err != nil {
log.Fatalf("failed to start amqp receiver, %s", err.Error())
}

// Wait until done.
<-ctx.Done()
return 0
}
118 changes: 118 additions & 0 deletions cmd/samples/amqp/sender/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"context"
"fmt"
"log"
"net/url"
"os"
"time"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport/amqp"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
qp "pack.ag/amqp"
)

const (
count = 10
)

type envConfig struct {
// AMQPServer URL to connect to the amqp server.
AMQPServer string `envconfig:"AMQP_SERVER" default:"amqp://localhost:5672/" required:"true"`

// Queue is the amqp queue name to interact with.
Queue string `envconfig:"AMQP_QUEUE"`

AccessKeyName string `envconfig:"AMQP_ACCESS_KEY_NAME" default:"guest"`
AccessKey string `envconfig:"AMQP_ACCESS_KEY" default:"password"`
}

func main() {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}
os.Exit(_main(os.Args[1:], env))
}

// Simple holder for the sending sample.
type Demo struct {
Message string
Source url.URL
Target url.URL

Client client.Client
}

// Basic data struct.
type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}

func (d *Demo) Send(eventContext cloudevents.EventContext, i int) (*cloudevents.Event, error) {
event := cloudevents.Event{
Context: eventContext,
Data: &Example{
Sequence: i,
Message: d.Message,
},
}
return d.Client.Send(context.Background(), event)
}

func _main(args []string, env envConfig) int {
source, err := url.Parse("https://github.com/cloudevents/sdk-go/cmd/samples/sender")
if err != nil {
log.Printf("failed to parse source url, %v", err)
return 1
}

seq := 0
contentType := "application/json"
t, err := amqp.New(env.AMQPServer, env.Queue,
amqp.WithConnOpt(qp.ConnSASLPlain(env.AccessKeyName, env.AccessKey)),
)
if err != nil {
log.Printf("failed to create amqp transport, %s", err.Error())
return 1
}
t.Encoding = amqp.BinaryV03
//t.Encoding = amqp.StructuredV02
c, err := client.New(t)
if err != nil {
log.Printf("failed to create client, %s", err.Error())
return 1
}

d := &Demo{
Message: fmt.Sprintf("Hello, %s!", contentType),
Source: *source,
Client: c,
}

for i := 0; i < count; i++ {
now := time.Now()
ctx := cloudevents.EventContextV03{
ID: uuid.New().String(),
Type: "com.cloudevents.sample.sent",
Time: &types.Timestamp{Time: now},
Source: types.URLRef{URL: d.Source},
DataContentType: &contentType,
}.AsV03()
if _, err := d.Send(ctx, seq); err != nil {
log.Printf("failed to send: %v", err)
return 1
}
seq++
time.Sleep(100 * time.Millisecond)
}

return 0
}
10 changes: 8 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
module github.com/cloudevents/sdk-go

require (
github.com/Azure/azure-sdk-for-go v28.1.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.1.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.1.0 // indirect
github.com/Azure/go-autorest/autorest/validation v0.1.0 // indirect
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/google/go-cmp v0.2.0
github.com/google/uuid v1.1.0
github.com/kelseyhightower/envconfig v1.3.0
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.7.0
github.com/nats-io/nkeys v0.0.2 // indirect
github.com/nats-io/nuid v1.0.0 // indirect
go.opencensus.io v0.19.1
go.opencensus.io v0.20.2
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/sys v0.0.0-20190219203350-90b0e4468f99 // indirect
pack.ag/amqp v0.11.0
)
Loading

0 comments on commit 9396c7c

Please sign in to comment.