From 9396c7c29f83ee72cc817e0853312fa824b388b7 Mon Sep 17 00:00:00 2001 From: Scott Nichols <32305648+n3wscott@users.noreply.github.com> Date: Fri, 3 May 2019 07:44:21 -0700 Subject: [PATCH] Add AMQP support. (#119) Signed-off-by: Scott Nichols --- cmd/samples/amqp/receiver/main.go | 68 +++ cmd/samples/amqp/sender/main.go | 118 +++++ go.mod | 10 +- go.sum | 116 ++++- pkg/cloudevents/event.go | 2 +- pkg/cloudevents/event_interface.go | 3 + pkg/cloudevents/event_reader.go | 5 + pkg/cloudevents/transport/amqp/codec.go | 282 ++++++++++++ pkg/cloudevents/transport/amqp/codec_test.go | 411 ++++++++++++++++++ pkg/cloudevents/transport/amqp/codec_v02.go | 65 +++ .../transport/amqp/codec_v02_test.go | 271 ++++++++++++ pkg/cloudevents/transport/amqp/codec_v03.go | 65 +++ .../transport/amqp/codec_v03_test.go | 271 ++++++++++++ pkg/cloudevents/transport/amqp/doc.go | 4 + pkg/cloudevents/transport/amqp/encoding.go | 62 +++ pkg/cloudevents/transport/amqp/message.go | 45 ++ pkg/cloudevents/transport/amqp/options.go | 51 +++ .../transport/amqp/options_test.go | 48 ++ pkg/cloudevents/transport/amqp/transport.go | 184 ++++++++ pkg/cloudevents/transport/http/codec.go | 16 - 20 files changed, 2058 insertions(+), 39 deletions(-) create mode 100644 cmd/samples/amqp/receiver/main.go create mode 100644 cmd/samples/amqp/sender/main.go create mode 100644 pkg/cloudevents/transport/amqp/codec.go create mode 100644 pkg/cloudevents/transport/amqp/codec_test.go create mode 100644 pkg/cloudevents/transport/amqp/codec_v02.go create mode 100644 pkg/cloudevents/transport/amqp/codec_v02_test.go create mode 100644 pkg/cloudevents/transport/amqp/codec_v03.go create mode 100644 pkg/cloudevents/transport/amqp/codec_v03_test.go create mode 100644 pkg/cloudevents/transport/amqp/doc.go create mode 100644 pkg/cloudevents/transport/amqp/encoding.go create mode 100644 pkg/cloudevents/transport/amqp/message.go create mode 100644 pkg/cloudevents/transport/amqp/options.go create mode 100644 pkg/cloudevents/transport/amqp/options_test.go create mode 100644 pkg/cloudevents/transport/amqp/transport.go diff --git a/cmd/samples/amqp/receiver/main.go b/cmd/samples/amqp/receiver/main.go new file mode 100644 index 000000000..3554369ae --- /dev/null +++ b/cmd/samples/amqp/receiver/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/client" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/amqp" + "github.com/kelseyhightower/envconfig" + qp "pack.ag/amqp" +) + +type envConfig struct { + // AMQPServer URL to connect to the amqp server. + AMQPServer string `envconfig:"AMQP_SERVER" default:"amqp://localhost:5672/" required:"true"` + + // Queue is the amqp queue name to interact with. + Queue string `envconfig:"AMQP_QUEUE"` + + AccessKeyName string `envconfig:"AMQP_ACCESS_KEY_NAME" default:"guest"` + AccessKey string `envconfig:"AMQP_ACCESS_KEY" default:"password"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + os.Exit(_main(os.Args[1:], env)) +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { + fmt.Printf("Got CloudEvent,\n%+v\n", event) + fmt.Println("----------------------------") + return nil +} + +func _main(args []string, env envConfig) int { + ctx := context.Background() + + t, err := amqp.New(env.AMQPServer, env.Queue, + amqp.WithConnOpt(qp.ConnSASLPlain(env.AccessKeyName, env.AccessKey)), + ) + if err != nil { + log.Fatalf("failed to create amqp transport, %s", err.Error()) + } + c, err := client.New(t) + if err != nil { + log.Fatalf("failed to create client, %s", err.Error()) + } + + if err := c.StartReceiver(ctx, receive); err != nil { + log.Fatalf("failed to start amqp receiver, %s", err.Error()) + } + + // Wait until done. + <-ctx.Done() + return 0 +} diff --git a/cmd/samples/amqp/sender/main.go b/cmd/samples/amqp/sender/main.go new file mode 100644 index 000000000..fc55e9e29 --- /dev/null +++ b/cmd/samples/amqp/sender/main.go @@ -0,0 +1,118 @@ +package main + +import ( + "context" + "fmt" + "log" + "net/url" + "os" + "time" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/client" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/amqp" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/google/uuid" + "github.com/kelseyhightower/envconfig" + qp "pack.ag/amqp" +) + +const ( + count = 10 +) + +type envConfig struct { + // AMQPServer URL to connect to the amqp server. + AMQPServer string `envconfig:"AMQP_SERVER" default:"amqp://localhost:5672/" required:"true"` + + // Queue is the amqp queue name to interact with. + Queue string `envconfig:"AMQP_QUEUE"` + + AccessKeyName string `envconfig:"AMQP_ACCESS_KEY_NAME" default:"guest"` + AccessKey string `envconfig:"AMQP_ACCESS_KEY" default:"password"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + os.Exit(_main(os.Args[1:], env)) +} + +// Simple holder for the sending sample. +type Demo struct { + Message string + Source url.URL + Target url.URL + + Client client.Client +} + +// Basic data struct. +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func (d *Demo) Send(eventContext cloudevents.EventContext, i int) (*cloudevents.Event, error) { + event := cloudevents.Event{ + Context: eventContext, + Data: &Example{ + Sequence: i, + Message: d.Message, + }, + } + return d.Client.Send(context.Background(), event) +} + +func _main(args []string, env envConfig) int { + source, err := url.Parse("https://github.com/cloudevents/sdk-go/cmd/samples/sender") + if err != nil { + log.Printf("failed to parse source url, %v", err) + return 1 + } + + seq := 0 + contentType := "application/json" + t, err := amqp.New(env.AMQPServer, env.Queue, + amqp.WithConnOpt(qp.ConnSASLPlain(env.AccessKeyName, env.AccessKey)), + ) + if err != nil { + log.Printf("failed to create amqp transport, %s", err.Error()) + return 1 + } + t.Encoding = amqp.BinaryV03 + //t.Encoding = amqp.StructuredV02 + c, err := client.New(t) + if err != nil { + log.Printf("failed to create client, %s", err.Error()) + return 1 + } + + d := &Demo{ + Message: fmt.Sprintf("Hello, %s!", contentType), + Source: *source, + Client: c, + } + + for i := 0; i < count; i++ { + now := time.Now() + ctx := cloudevents.EventContextV03{ + ID: uuid.New().String(), + Type: "com.cloudevents.sample.sent", + Time: &types.Timestamp{Time: now}, + Source: types.URLRef{URL: d.Source}, + DataContentType: &contentType, + }.AsV03() + if _, err := d.Send(ctx, seq); err != nil { + log.Printf("failed to send: %v", err) + return 1 + } + seq++ + time.Sleep(100 * time.Millisecond) + } + + return 0 +} diff --git a/go.mod b/go.mod index a8bb6457f..699da4902 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,11 @@ module github.com/cloudevents/sdk-go require ( + github.com/Azure/azure-sdk-for-go v28.1.0+incompatible // indirect + github.com/Azure/go-autorest/autorest v0.1.0 // indirect + github.com/Azure/go-autorest/autorest/to v0.1.0 // indirect + github.com/Azure/go-autorest/autorest/validation v0.1.0 // indirect + github.com/fortytw2/leaktest v1.3.0 // indirect github.com/google/go-cmp v0.2.0 github.com/google/uuid v1.1.0 github.com/kelseyhightower/envconfig v1.3.0 @@ -8,10 +13,11 @@ require ( github.com/nats-io/go-nats v1.7.0 github.com/nats-io/nkeys v0.0.2 // indirect github.com/nats-io/nuid v1.0.0 // indirect - go.opencensus.io v0.19.1 + go.opencensus.io v0.20.2 go.uber.org/atomic v1.3.2 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.9.1 - golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 + golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 golang.org/x/sys v0.0.0-20190219203350-90b0e4468f99 // indirect + pack.ag/amqp v0.11.0 ) diff --git a/go.sum b/go.sum index fc74e26f9..b82a4b26c 100644 --- a/go.sum +++ b/go.sum @@ -1,40 +1,83 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -git.apache.org/thrift.git v0.12.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= +contrib.go.opencensus.io/exporter/ocagent v0.4.12 h1:jGFvw3l57ViIVEPKKEUXPcLYIXJmQxLUh6ey1eJhwyc= +contrib.go.opencensus.io/exporter/ocagent v0.4.12/go.mod h1:450APlNTSR6FrvC3CTRqYosuDstRB9un7SOx2k/9ckA= +github.com/Azure/azure-sdk-for-go v28.1.0+incompatible h1:uApF+FNMxRibKyoWxLatbrBJse505r7UVdrOm3dEtfk= +github.com/Azure/azure-sdk-for-go v28.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/go-autorest/autorest v0.1.0 h1:z68s0uL7bVfplrwwCUsYoMezUVQdym6EPOllAT02BtU= +github.com/Azure/go-autorest/autorest v0.1.0/go.mod h1:AKyIcETwSUFxIcs/Wnq/C+kwCtlEYGUVd7FPNb2slmg= +github.com/Azure/go-autorest/autorest/adal v0.1.0 h1:RSw/7EAullliqwkZvgIGDYZWQm1PGKXI8c4aY/87yuU= +github.com/Azure/go-autorest/autorest/adal v0.1.0/go.mod h1:MeS4XhScH55IST095THyTxElntu7WqB7pNbZo8Q5G3E= +github.com/Azure/go-autorest/autorest/date v0.1.0 h1:YGrhWfrgtFs84+h0o46rJrlmsZtyZRg470CqAXTZaGM= +github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA= +github.com/Azure/go-autorest/autorest/mocks v0.1.0 h1:Kx+AUU2Te+A3JIyYn6Dfs+cFgx5XorQKuIXrZGoq/SI= +github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= +github.com/Azure/go-autorest/autorest/to v0.1.0 h1:6Hno3b0XM9ISW4oVsALqfE2WBgZLYxbPldyTuRn6wNk= +github.com/Azure/go-autorest/autorest/to v0.1.0/go.mod h1:GunWKJp1AEqgMaGLV+iocmRAJWqST1wQYhyyjXJ3SJc= +github.com/Azure/go-autorest/autorest/validation v0.1.0 h1:ISSNzGUh+ZSzizJWOWzs8bwpXIePbGLW4z/AmUFGH5A= +github.com/Azure/go-autorest/autorest/validation v0.1.0/go.mod h1:Ha3z/SqBeaalWQvokg3NZAlQTalVMtOIAs1aGK7G6u8= +github.com/Azure/go-autorest/logger v0.1.0 h1:ruG4BSDXONFRrZZJ2GUXDiUyVpayPmb1GnWeHDdaNKY= +github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= +github.com/Azure/go-autorest/tracing v0.1.0 h1:TRBxC5Pj/fIuh4Qob0ZpkggbfT8RC0SubHbpV3p4/Vc= +github.com/Azure/go-autorest/tracing v0.1.0/go.mod h1:ROEEAFwXycQw7Sn3DXNtEedEvdeRAgDr0izn4z5Ij88= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4= +github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s= github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway v1.6.2/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE= +github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kelseyhightower/envconfig v1.3.0 h1:IvRS4f2VcIQy6j4ORGIf9145T/AsUB+oY8LyvN8BXNM= github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -46,7 +89,11 @@ github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M= github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs= github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/openzipkin/zipkin-go v0.1.3/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -62,13 +109,19 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 h1:/K3IL0Z1quvmJ7X0A1AwNEK7CRkVK3YwfOU/QAL4WGg= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -go.opencensus.io v0.19.1 h1:gPYKQ/GAQYR2ksU+qXNmq3CrOZWT1kkryvW6O0v1acY= -go.opencensus.io v0.19.1/go.mod h1:gug0GbSHa8Pafr0d2urOSgoXHZ6x/RUlaiT0d9pqb4A= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= +go.opencensus.io v0.20.2 h1:NAfh7zF0/3/HqtMvJNZ/RFrSlCE6ZTlHmKfhL/Dm1Jk= +go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -78,45 +131,68 @@ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= -golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181218192612-074acd46bca6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190219203350-90b0e4468f99 h1:mlL4HvR5ojTCLdWRydhoj7jto5SXLsxLc0b1r/3DNlE= golang.org/x/sys v0.0.0-20190219203350-90b0e4468f99/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20181219222714-6e267b5cc78e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -google.golang.org/api v0.0.0-20181220000619-583d854617af/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/api v0.3.1 h1:oJra/lMfmtm13/rgY/8i3MzjFWYXvQIAKjQ3HqofMk8= +google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181219182458-5a97ab628bfb h1:dQshZyyJ5W/Xk8myF4GKBak1pZW6EywJuQ8+44EQhGA= -google.golang.org/genproto v0.0.0-20181219182458-5a97ab628bfb/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg= -google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19 h1:Lj2SnHtxkRGJDqnGaSjo+CCdIieEnwVazbOXILwQemk= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.19.1 h1:TrBcJ1yqAl1G++wO39nD/qtgpsW9/1+QGrluyMGEYgM= +google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +pack.ag/amqp v0.11.0 h1:ot/IA0enDkt4/c8xfbCO7AZzjM4bHys/UffnFmnHUnU= +pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= diff --git a/pkg/cloudevents/event.go b/pkg/cloudevents/event.go index 0f81258d2..9b0b90367 100644 --- a/pkg/cloudevents/event.go +++ b/pkg/cloudevents/event.go @@ -71,7 +71,7 @@ func (e Event) String() string { if e.Data != nil { b.WriteString("Data,\n ") - if strings.HasPrefix(e.DataContentType(), "application/json") { + if strings.HasPrefix(e.DataContentType(), ApplicationJSON) { var prettyJSON bytes.Buffer data, ok := e.Data.([]byte) diff --git a/pkg/cloudevents/event_interface.go b/pkg/cloudevents/event_interface.go index da3890e93..8ca52b04d 100644 --- a/pkg/cloudevents/event_interface.go +++ b/pkg/cloudevents/event_interface.go @@ -29,6 +29,9 @@ type EventReader interface { // Extension Attributes + // Extensions returns the event.Context.GetExtensions(). + Extensions() map[string]interface{} + // ExtensionAs returns event.Context.ExtensionAs(name, obj). ExtensionAs(string, interface{}) error diff --git a/pkg/cloudevents/event_reader.go b/pkg/cloudevents/event_reader.go index ab31257e2..06fc95c83 100644 --- a/pkg/cloudevents/event_reader.go +++ b/pkg/cloudevents/event_reader.go @@ -58,3 +58,8 @@ func (e Event) DataMediaType() string { func (e Event) DataContentEncoding() string { return e.Context.GetDataContentEncoding() } + +// DataContentEncoding implements EventReader.DataContentEncoding +func (e Event) Extensions() map[string]interface{} { + return e.Context.GetExtensions() +} diff --git a/pkg/cloudevents/transport/amqp/codec.go b/pkg/cloudevents/transport/amqp/codec.go new file mode 100644 index 000000000..a04d5cbde --- /dev/null +++ b/pkg/cloudevents/transport/amqp/codec.go @@ -0,0 +1,282 @@ +package amqp + +import ( + "encoding/json" + "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "net/textproto" + "strings" +) + +type Codec struct { + Encoding Encoding + + v02 *CodecV02 + v03 *CodecV03 +} + +const ( + prefix = "cloudEvents:" +) + +var _ transport.Codec = (*Codec)(nil) + +func (c *Codec) Encode(e cloudevents.Event) (transport.Message, error) { + switch c.Encoding { + case Default: + fallthrough + case BinaryV02, BinaryV03: + return c.encodeBinary(e) + case StructuredV02: + if c.v02 == nil { + c.v02 = &CodecV02{Encoding: c.Encoding} + } + return c.v02.Encode(e) + case StructuredV03: + if c.v03 == nil { + c.v03 = &CodecV03{Encoding: c.Encoding} + } + return c.v03.Encode(e) + default: + return nil, fmt.Errorf("unknown encoding: %d", c.Encoding) + } +} + +func (c *Codec) Decode(msg transport.Message) (*cloudevents.Event, error) { + switch encoding := c.inspectEncoding(msg); encoding { + case BinaryV02: + event := cloudevents.New(cloudevents.CloudEventsVersionV02) + return c.decodeBinary(msg, &event) + case BinaryV03: + event := cloudevents.New(cloudevents.CloudEventsVersionV03) + return c.decodeBinary(msg, &event) + case StructuredV02: + if c.v02 == nil { + c.v02 = &CodecV02{Encoding: encoding} + } + return c.v02.Decode(msg) + case StructuredV03: + if c.v03 == nil { + c.v03 = &CodecV03{Encoding: encoding} + } + return c.v03.Decode(msg) + default: + + fmt.Printf("HACKHACKHACK %+v", msg) + + return nil, fmt.Errorf("unknown encoding: %s", encoding) + } +} + +func (c Codec) encodeBinary(e cloudevents.Event) (transport.Message, error) { + headers, err := c.toHeaders(e) + if err != nil { + return nil, err + } + body, err := e.DataBytes() + if err != nil { + return nil, err + } + + msg := &Message{ + ApplicationProperties: headers, + Body: body, + } + + if e.DataContentType() != "" { + msg.ContentType = e.DataContentType() + } else { + msg.ContentType = cloudevents.ApplicationJSON + } + + return msg, nil +} + +func (c Codec) toHeaders(e cloudevents.Event) (map[string]interface{}, error) { + h := make(map[string]interface{}) + h[prefix+"specversion"] = e.SpecVersion() + h[prefix+"type"] = e.Type() + h[prefix+"source"] = e.Source() + h[prefix+"id"] = e.ID() + if !e.Time().IsZero() { + t := types.Timestamp{Time: e.Time()} // TODO: change e.Time() to return string so I don't have to do this. + h[prefix+"time"] = t.String() + } + if e.SchemaURL() != "" { + h[prefix+"schemaurl"] = e.SchemaURL() + } + + for k, v := range e.Extensions() { + if mapVal, ok := v.(map[string]interface{}); ok { + for subkey, subval := range mapVal { + encoded, err := json.Marshal(subval) + if err != nil { + return nil, err + } + h[prefix+k+"-"+subkey] = string(encoded) + } + continue + } + if s, ok := v.(string); ok { + h[prefix+k] = s + continue + } + encoded, err := json.Marshal(v) + if err != nil { + return nil, err + } + h[prefix+k] = string(encoded) + } + + return h, nil +} + +func (c Codec) decodeBinary(msg transport.Message, event *cloudevents.Event) (*cloudevents.Event, error) { + m, ok := msg.(*Message) + if !ok { + return nil, fmt.Errorf("failed to convert transport.Message to amqp.Message") + } + err := c.fromHeaders(m.ApplicationProperties, event) + if err != nil { + return nil, err + } + var body interface{} + if len(m.Body) > 0 { + body = m.Body + } + event.Data = body + event.DataEncoded = true + return event, nil +} + +func (c Codec) fromHeaders(h map[string]interface{}, event *cloudevents.Event) error { + // Normalize headers. + for k, v := range h { + ck := textproto.CanonicalMIMEHeaderKey(k) + if k != ck { + delete(h, k) + h[ck] = v + } + } + + ec := event.Context + + if sv, ok := h[prefix+"specversion"].(string); ok { + if err := ec.SetSpecVersion(sv); err != nil { + return err + } + } + delete(h, prefix+"specversion") + + if id, ok := h[prefix+"id"].(string); ok { + if err := ec.SetID(id); err != nil { + return err + } + } + delete(h, prefix+"id") + + if t, ok := h[prefix+"type"].(string); ok { + if err := ec.SetType(t); err != nil { + return err + } + } + delete(h, prefix+"type") + + if s, ok := h[prefix+"source"].(string); ok { + if err := ec.SetSource(s); err != nil { + return err + } + } + delete(h, prefix+"source") + + if t, ok := h[prefix+"time"].(string); ok { // TODO: time can be empty + timestamp := types.ParseTimestamp(t) + if err := ec.SetTime(timestamp.Time); err != nil { + return err + } + } + delete(h, prefix+"time") + + if t, ok := h[prefix+"schemaurl"].(string); ok { + timestamp := types.ParseTimestamp(t) + if err := ec.SetTime(timestamp.Time); err != nil { + return err + } + } + delete(h, prefix+"schemaurl") + + if s, ok := h[prefix+"subject"].(string); ok { + if err := ec.SetSubject(s); err != nil { + return err + } + } + delete(h, prefix+"subject") + + // At this point, we have deleted all the known headers. + // Everything left is assumed to be an extension. + + extensions := make(map[string]interface{}) + for k, v := range h { + if len(k) > len(prefix) && strings.EqualFold(k[:len(prefix)], prefix) { + ak := strings.ToLower(k[len(prefix):]) + if i := strings.Index(ak, "-"); i > 0 { + // attrib-key + attrib := ak[:i] + key := ak[(i + 1):] + if xv, ok := extensions[attrib]; ok { + if m, ok := xv.(map[string]interface{}); ok { + m[key] = v + continue + } + // TODO: revisit how we want to bubble errors up. + return fmt.Errorf("failed to process map type extension") + } else { + m := make(map[string]interface{}) + m[key] = v + extensions[attrib] = m + } + } else { + // key + var tmp interface{} + if err := json.Unmarshal([]byte(v.(string)), &tmp); err == nil { + extensions[ak] = tmp + } else { + // If we can't unmarshal the data, treat it as a string. + extensions[ak] = v + } + } + } + } + event.Context = ec + if len(extensions) > 0 { + for k, v := range extensions { + event.SetExtension(k, v) + } + } + return nil +} + +func (c *Codec) inspectEncoding(msg transport.Message) Encoding { + if c.v02 == nil { + c.v02 = &CodecV02{Encoding: c.Encoding} + } + // Try v0.2. + encoding := c.v02.inspectEncoding(msg) + if encoding != Unknown { + return encoding + } + + if c.v03 == nil { + c.v03 = &CodecV03{Encoding: c.Encoding} + } + // Try v0.3. + encoding = c.v03.inspectEncoding(msg) + if encoding != Unknown { + return encoding + } + + // We do not understand the message encoding. + return Unknown +} diff --git a/pkg/cloudevents/transport/amqp/codec_test.go b/pkg/cloudevents/transport/amqp/codec_test.go new file mode 100644 index 000000000..a692db687 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/codec_test.go @@ -0,0 +1,411 @@ +package amqp_test + +import ( + "encoding/json" + "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/amqp" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/google/go-cmp/cmp" + "net/url" + "testing" + "time" +) + +func strptr(s string) *string { + return &s +} + +// TODO: Test Binary Mode + +func TestCodecEncode(t *testing.T) { + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URLRef{URL: *sourceUrl} + + testCases := map[string]struct { + codec amqp.Codec + event cloudevents.Event + want *amqp.Message + wantErr error + }{ + "simple v02 structured": { + codec: amqp.Codec{Encoding: amqp.StructuredV02}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV02{ + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "contenttype": "application/json", + "specversion": "0.2", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + got, err := tc.codec.Encode(tc.event) + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + if msg, ok := got.(*amqp.Message); ok { + // It is hard to read the byte dump + want := string(tc.want.Body) + got := string(msg.Body) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + return + } + } + t.Errorf("unexpected message (-want, +got) = %v", diff) + } + }) + } +} + +func TestCodecDecode(t *testing.T) { + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URLRef{URL: *sourceUrl} + + testCases := map[string]struct { + codec amqp.Codec + msg *amqp.Message + want *cloudevents.Event + wantErr error + }{ + "simple v2 structured": { + codec: amqp.Codec{Encoding: amqp.StructuredV02}, + msg: &amqp.Message{ + ContentType: cloudevents.ApplicationCloudEventsJSON, + Body: func() []byte { + body := map[string]interface{}{ + "specversion": "0.2", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + want: &cloudevents.Event{ + Context: &cloudevents.EventContextV02{ + SpecVersion: cloudevents.CloudEventsVersionV02, + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + DataEncoded: true, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + got, err := tc.codec.Decode(tc.msg) + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("unexpected event (-want, +got) = %v", diff) + } + }) + } +} + +type DataExample struct { + AnInt int `json:"a,omitempty" xml:"a,omitempty"` + AString string `json:"b,omitempty" xml:"b,omitempty"` + AnArray []string `json:"c,omitempty" xml:"c,omitempty"` + AMap map[string]map[string]int `json:"d,omitempty" xml:"d,omitempty"` + ATime *time.Time `json:"e,omitempty" xml:"e,omitempty"` +} + +func TestCodecRoundTrip(t *testing.T) { + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URLRef{URL: *sourceUrl} + + for _, encoding := range []amqp.Encoding{amqp.StructuredV02} { + + testCases := map[string]struct { + codec amqp.Codec + event cloudevents.Event + want cloudevents.Event + wantErr error + }{ + "simple data": { + codec: amqp.Codec{Encoding: encoding}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV01{ + EventType: "com.example.test", + Source: *source, + EventID: "ABC-123", + }, + Data: map[string]string{ + "a": "apple", + "b": "banana", + }, + }, + want: cloudevents.Event{ + Context: &cloudevents.EventContextV01{ + CloudEventsVersion: cloudevents.CloudEventsVersionV01, + EventType: "com.example.test", + Source: *source, + EventID: "ABC-123", + ContentType: cloudevents.StringOfApplicationJSON(), + }, + Data: map[string]interface{}{ + "a": "apple", + "b": "banana", + }, + DataEncoded: true, + }, + }, + "struct data": { + codec: amqp.Codec{Encoding: encoding}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV01{ + EventType: "com.example.test", + Source: *source, + EventID: "ABC-123", + }, + Data: DataExample{ + AnInt: 42, + AString: "testing", + }, + }, + want: cloudevents.Event{ + Context: &cloudevents.EventContextV01{ + CloudEventsVersion: cloudevents.CloudEventsVersionV01, + EventType: "com.example.test", + Source: *source, + EventID: "ABC-123", + ContentType: cloudevents.StringOfApplicationJSON(), + }, + Data: &DataExample{ + AnInt: 42, + AString: "testing", + }, + DataEncoded: true, + }, + }, + } + for n, tc := range testCases { + n = fmt.Sprintf("%s, %s", encoding, n) + t.Run(n, func(t *testing.T) { + + msg, err := tc.codec.Encode(tc.event) + if err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + got, err := tc.codec.Decode(msg) + if err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if tc.event.Data != nil { + // We have to be pretty tricky in the test to get the correct type. + data, _ := types.Allocate(tc.want.Data) + err := got.DataAs(&data) + if err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + got.Data = data + } + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + // fix the context back to v1 to test. + ctxv1 := got.Context.AsV01() + got.Context = ctxv1 + + if diff := cmp.Diff(tc.want, *got); diff != "" { + t.Errorf("unexpected event (-want, +got) = %v", diff) + } + }) + } + + } +} + +func TestCodecAsMiddleware(t *testing.T) { + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URLRef{URL: *sourceUrl} + + for _, encoding := range []amqp.Encoding{amqp.StructuredV02} { + + testCases := map[string]struct { + codec amqp.Codec + event cloudevents.Event + want cloudevents.Event + wantErr error + }{ + "simple data": { + codec: amqp.Codec{Encoding: encoding}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV01{ + EventType: "com.example.test", + Source: *source, + EventID: "ABC-123", + }, + Data: map[string]string{ + "a": "apple", + "b": "banana", + }, + }, + want: cloudevents.Event{ + Context: &cloudevents.EventContextV01{ + CloudEventsVersion: cloudevents.CloudEventsVersionV01, + EventType: "com.example.test", + Source: *source, + EventID: "ABC-123", + ContentType: cloudevents.StringOfApplicationJSON(), + }, + Data: map[string]interface{}{ + "a": "apple", + "b": "banana", + }, + DataEncoded: true, + }, + }, + "struct data": { + codec: amqp.Codec{Encoding: encoding}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV01{ + EventType: "com.example.test", + Source: *source, + EventID: "ABC-123", + }, + Data: DataExample{ + AnInt: 42, + AString: "testing", + }, + }, + want: cloudevents.Event{ + Context: &cloudevents.EventContextV01{ + CloudEventsVersion: cloudevents.CloudEventsVersionV01, + EventType: "com.example.test", + Source: *source, + EventID: "ABC-123", + ContentType: cloudevents.StringOfApplicationJSON(), + }, + Data: &DataExample{ + AnInt: 42, + AString: "testing", + }, + DataEncoded: true, + }, + }, + } + for n, tc := range testCases { + n = fmt.Sprintf("%s, %s", encoding, n) + t.Run(n, func(t *testing.T) { + + msg1, err := tc.codec.Encode(tc.event) + if err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + midEvent, err := tc.codec.Decode(msg1) + if err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + msg2, err := tc.codec.Encode(*midEvent) + if err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + got, err := tc.codec.Decode(msg2) + if err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if tc.event.Data != nil { + // We have to be pretty tricky in the test to get the correct type. + data, _ := types.Allocate(tc.want.Data) + err := got.DataAs(&data) + if err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + got.Data = data + } + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + // fix the context back to v1 to test. + ctxv1 := got.Context.AsV01() + got.Context = ctxv1 + + if diff := cmp.Diff(tc.want, *got); diff != "" { + t.Errorf("unexpected event (-want, +got) = %v", diff) + } + }) + } + + } +} + +func toBytes(body map[string]interface{}) []byte { + b, err := json.Marshal(body) + if err != nil { + return []byte(fmt.Sprintf(`{"error":%q}`, err.Error())) + } + return b +} diff --git a/pkg/cloudevents/transport/amqp/codec_v02.go b/pkg/cloudevents/transport/amqp/codec_v02.go new file mode 100644 index 000000000..7f3a3c2c4 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/codec_v02.go @@ -0,0 +1,65 @@ +package amqp + +import ( + "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/codec" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" +) + +type CodecV02 struct { + Encoding Encoding +} + +var _ transport.Codec = (*CodecV02)(nil) + +func (v CodecV02) Encode(e cloudevents.Event) (transport.Message, error) { + switch v.Encoding { + case Default: + fallthrough + case StructuredV02: + return v.encodeStructured(e) + default: + return nil, fmt.Errorf("unknown encoding: %d", v.Encoding) + } +} + +func (v CodecV02) Decode(msg transport.Message) (*cloudevents.Event, error) { + // only structured is supported as of v0.2 + return v.decodeStructured(msg) +} + +func (v CodecV02) encodeStructured(e cloudevents.Event) (transport.Message, error) { + body, err := codec.JsonEncodeV02(e) + if err != nil { + return nil, err + } + return &Message{ + Body: body, + ContentType: cloudevents.ApplicationCloudEventsJSON, + }, nil +} + +func (v CodecV02) decodeStructured(msg transport.Message) (*cloudevents.Event, error) { + m, ok := msg.(*Message) + if !ok { + return nil, fmt.Errorf("failed to convert transport.Message to http.Message") + } + return codec.JsonDecodeV02(m.Body) +} + +func (v CodecV02) inspectEncoding(msg transport.Message) Encoding { + version := msg.CloudEventsVersion() + if version != cloudevents.CloudEventsVersionV02 { + return Unknown + } + m, ok := msg.(*Message) + if !ok { + return Unknown + } + contentType := m.ContentType + if contentType == cloudevents.ApplicationCloudEventsJSON { + return StructuredV02 + } + return BinaryV02 +} diff --git a/pkg/cloudevents/transport/amqp/codec_v02_test.go b/pkg/cloudevents/transport/amqp/codec_v02_test.go new file mode 100644 index 000000000..c40fda644 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/codec_v02_test.go @@ -0,0 +1,271 @@ +package amqp_test + +import ( + "encoding/json" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/amqp" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/google/go-cmp/cmp" + "net/url" + "testing" + "time" +) + +func TestCodecV02_Encode(t *testing.T) { + now := types.Timestamp{Time: time.Now().UTC()} + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URLRef{URL: *sourceUrl} + + schemaUrl, _ := url.Parse("http://example.com/schema") + schema := &types.URLRef{URL: *schemaUrl} + + testCases := map[string]struct { + codec amqp.CodecV02 + event cloudevents.Event + want *amqp.Message + wantErr error + }{ + "simple v2 default": { + codec: amqp.CodecV02{}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV02{ + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "contenttype": "application/json", + "specversion": "0.2", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "full v2 default": { + codec: amqp.CodecV02{}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV02{ + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + SchemaURL: schema, + ContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": "extended", + }, + }, + Data: map[string]interface{}{ + "hello": "world", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "specversion": "0.2", + "contenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "schemaurl": "http://example.com/schema", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "simple v2 structured": { + codec: amqp.CodecV02{Encoding: amqp.StructuredV02}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV02{ + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "contenttype": "application/json", + "specversion": "0.2", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "full v2 structured": { + codec: amqp.CodecV02{Encoding: amqp.StructuredV02}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV02{ + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + SchemaURL: schema, + ContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": "extended", + }, + }, + Data: map[string]interface{}{ + "hello": "world", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "specversion": "0.2", + "contenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "schemaurl": "http://example.com/schema", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + got, err := tc.codec.Encode(tc.event) + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + + if msg, ok := got.(*amqp.Message); ok { + // It is hard to read the byte dump + want := string(tc.want.Body) + got := string(msg.Body) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected message body (-want, +got) = %v", diff) + return + } + } + + t.Errorf("unexpected message (-want, +got) = %v", diff) + } + }) + } +} + +// TODO: figure out extensions for v2 + +func TestCodecV02_Decode(t *testing.T) { + now := types.Timestamp{Time: time.Now()} + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URLRef{URL: *sourceUrl} + + schemaUrl, _ := url.Parse("http://example.com/schema") + schema := &types.URLRef{URL: *schemaUrl} + + testCases := map[string]struct { + codec amqp.CodecV02 + msg *amqp.Message + want *cloudevents.Event + wantErr error + }{ + "simple v2 structured": { + codec: amqp.CodecV02{}, + msg: &amqp.Message{ + Body: toBytes(map[string]interface{}{ + "specversion": "0.2", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + }), + }, + want: &cloudevents.Event{ + Context: &cloudevents.EventContextV02{ + SpecVersion: cloudevents.CloudEventsVersionV02, + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + DataEncoded: true, + }, + }, + "full v2 structured": { + codec: amqp.CodecV02{}, + msg: &amqp.Message{ + Body: toBytes(map[string]interface{}{ + "specversion": "0.2", + "contenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "schemaurl": "http://example.com/schema", + "source": "http://example.com/source", + }), + }, + want: &cloudevents.Event{ + Context: &cloudevents.EventContextV02{ + SpecVersion: cloudevents.CloudEventsVersionV02, + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + SchemaURL: schema, + ContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": json.RawMessage(`"extended"`), + }, + }, + Data: toBytes(map[string]interface{}{ + "hello": "world", + }), + DataEncoded: true, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + got, err := tc.codec.Decode(tc.msg) + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("unexpected event (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/cloudevents/transport/amqp/codec_v03.go b/pkg/cloudevents/transport/amqp/codec_v03.go new file mode 100644 index 000000000..f2af5f244 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/codec_v03.go @@ -0,0 +1,65 @@ +package amqp + +import ( + "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/codec" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" +) + +type CodecV03 struct { + Encoding Encoding +} + +var _ transport.Codec = (*CodecV03)(nil) + +func (v CodecV03) Encode(e cloudevents.Event) (transport.Message, error) { + switch v.Encoding { + case Default: + fallthrough + case StructuredV03: + return v.encodeStructured(e) + default: + return nil, fmt.Errorf("unknown encoding: %d", v.Encoding) + } +} + +func (v CodecV03) Decode(msg transport.Message) (*cloudevents.Event, error) { + // only structured is supported as of v0.3 + return v.decodeStructured(msg) +} + +func (v CodecV03) encodeStructured(e cloudevents.Event) (transport.Message, error) { + body, err := codec.JsonEncodeV03(e) + if err != nil { + return nil, err + } + return &Message{ + Body: body, + ContentType: cloudevents.ApplicationCloudEventsJSON, + }, nil +} + +func (v CodecV03) decodeStructured(msg transport.Message) (*cloudevents.Event, error) { + m, ok := msg.(*Message) + if !ok { + return nil, fmt.Errorf("failed to convert transport.Message to http.Message") + } + return codec.JsonDecodeV03(m.Body) +} + +func (v CodecV03) inspectEncoding(msg transport.Message) Encoding { + version := msg.CloudEventsVersion() + if version != cloudevents.CloudEventsVersionV03 { + return Unknown + } + m, ok := msg.(*Message) + if !ok { + return Unknown + } + contentType := m.ContentType + if contentType == cloudevents.ApplicationCloudEventsJSON { + return StructuredV03 + } + return BinaryV03 +} diff --git a/pkg/cloudevents/transport/amqp/codec_v03_test.go b/pkg/cloudevents/transport/amqp/codec_v03_test.go new file mode 100644 index 000000000..5b687ff92 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/codec_v03_test.go @@ -0,0 +1,271 @@ +package amqp_test + +import ( + "encoding/json" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/amqp" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/google/go-cmp/cmp" + "net/url" + "testing" + "time" +) + +func TestCodecV03_Encode(t *testing.T) { + now := types.Timestamp{Time: time.Now().UTC()} + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URLRef{URL: *sourceUrl} + + schemaUrl, _ := url.Parse("http://example.com/schema") + schema := &types.URLRef{URL: *schemaUrl} + + testCases := map[string]struct { + codec amqp.CodecV03 + event cloudevents.Event + want *amqp.Message + wantErr error + }{ + "simple v0.3 default": { + codec: amqp.CodecV03{}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV03{ + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "datacontenttype": "application/json", + "specversion": "0.3", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "full v0.3 default": { + codec: amqp.CodecV03{}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV03{ + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + SchemaURL: schema, + DataContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": "extended", + }, + }, + Data: map[string]interface{}{ + "hello": "world", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "specversion": "0.3", + "datacontenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "schemaurl": "http://example.com/schema", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "simple v0.3 structured": { + codec: amqp.CodecV03{Encoding: amqp.StructuredV03}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV03{ + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "datacontenttype": "application/json", + "specversion": "0.3", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "full v0.3 structured": { + codec: amqp.CodecV03{Encoding: amqp.StructuredV03}, + event: cloudevents.Event{ + Context: &cloudevents.EventContextV03{ + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + SchemaURL: schema, + DataContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": "extended", + }, + }, + Data: map[string]interface{}{ + "hello": "world", + }, + }, + want: &amqp.Message{ + ContentType: "application/cloudevents+json", + Body: func() []byte { + body := map[string]interface{}{ + "specversion": "0.3", + "datacontenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "schemaurl": "http://example.com/schema", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + got, err := tc.codec.Encode(tc.event) + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + + if msg, ok := got.(*amqp.Message); ok { + // It is hard to read the byte dump + want := string(tc.want.Body) + got := string(msg.Body) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected message body (-want, +got) = %v", diff) + return + } + } + + t.Errorf("unexpected message (-want, +got) = %v", diff) + } + }) + } +} + +// TODO: figure out extensions for v0.3 + +func TestCodecV03_Decode(t *testing.T) { + now := types.Timestamp{Time: time.Now()} + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URLRef{URL: *sourceUrl} + + schemaUrl, _ := url.Parse("http://example.com/schema") + schema := &types.URLRef{URL: *schemaUrl} + + testCases := map[string]struct { + codec amqp.CodecV03 + msg *amqp.Message + want *cloudevents.Event + wantErr error + }{ + "simple v0.3 structured": { + codec: amqp.CodecV03{}, + msg: &amqp.Message{ + Body: toBytes(map[string]interface{}{ + "specversion": "0.3", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + }), + }, + want: &cloudevents.Event{ + Context: &cloudevents.EventContextV03{ + SpecVersion: cloudevents.CloudEventsVersionV03, + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + DataEncoded: true, + }, + }, + "full v0.3 structured": { + codec: amqp.CodecV03{}, + msg: &amqp.Message{ + Body: toBytes(map[string]interface{}{ + "specversion": "0.3", + "datacontenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "schemaurl": "http://example.com/schema", + "source": "http://example.com/source", + }), + }, + want: &cloudevents.Event{ + Context: &cloudevents.EventContextV03{ + SpecVersion: cloudevents.CloudEventsVersionV03, + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + SchemaURL: schema, + DataContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": json.RawMessage(`"extended"`), + }, + }, + Data: toBytes(map[string]interface{}{ + "hello": "world", + }), + DataEncoded: true, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + got, err := tc.codec.Decode(tc.msg) + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("unexpected event (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/cloudevents/transport/amqp/doc.go b/pkg/cloudevents/transport/amqp/doc.go new file mode 100644 index 000000000..75695e17f --- /dev/null +++ b/pkg/cloudevents/transport/amqp/doc.go @@ -0,0 +1,4 @@ +/* +Package amqp implements the CloudEvent transport implementation using amqp. +*/ +package amqp diff --git a/pkg/cloudevents/transport/amqp/encoding.go b/pkg/cloudevents/transport/amqp/encoding.go new file mode 100644 index 000000000..84c134ee6 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/encoding.go @@ -0,0 +1,62 @@ +package amqp + +// Encoding to use for amqp transport. +type Encoding int32 + +const ( + // Default allows amqp transport implementation to pick. + Default Encoding = iota + // BinaryV02 is Binary CloudEvents spec v0.2. + BinaryV02 + // StructuredV02 is Structured CloudEvents spec v0.2. + StructuredV02 + // BinaryV03 is Binary CloudEvents spec v0.3. + BinaryV03 + // StructuredV03 is Structured CloudEvents spec v0.3. + StructuredV03 + // Unknown is unknown. + Unknown +) + +// String pretty-prints the encoding as a string. +func (e Encoding) String() string { + switch e { + case Default: + return "Default Encoding " + e.Version() + + // Binary + case BinaryV02: + fallthrough + case BinaryV03: + return "Binary Encoding " + e.Version() + + // Structured + case StructuredV02: + fallthrough + case StructuredV03: + return "Structured Encoding " + e.Version() + + default: + return "Unknown Encoding" + } +} + +// Version pretty-prints the encoding version as a string. +func (e Encoding) Version() string { + switch e { + + // Version 0.2 + case Default: // <-- Move when a new default is wanted. + fallthrough + case StructuredV02: + return "v0.2" + + // Version 0.3 + case StructuredV03: + return "v0.3" + + // Unknown + default: + return "Unknown" + } +} diff --git a/pkg/cloudevents/transport/amqp/message.go b/pkg/cloudevents/transport/amqp/message.go new file mode 100644 index 000000000..125635dd9 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/message.go @@ -0,0 +1,45 @@ +package amqp + +import ( + "encoding/json" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" +) + +// type check that this transport message impl matches the contract +var _ transport.Message = (*Message)(nil) + +type Message struct { + ContentType string + ApplicationProperties map[string]interface{} + Body []byte +} + +// TODO: update this to work with AMQP +func (m Message) CloudEventsVersion() string { + // Check as Binary encoding first. + if m.ApplicationProperties != nil { + // Binary v0.2, v0.3: + if v := m.ApplicationProperties["cloudEvents:specversion"]; v != nil { + if s, ok := v.(string); ok { + return s + } + } + } + + // Now check as Structured encoding. + raw := make(map[string]json.RawMessage) + if err := json.Unmarshal(m.Body, &raw); err != nil { + return "" + } + + // structured v0.2, v0.3 + if v, ok := raw["specversion"]; ok { + var version string + if err := json.Unmarshal(v, &version); err != nil { + return "" + } + return version + } + + return "" +} diff --git a/pkg/cloudevents/transport/amqp/options.go b/pkg/cloudevents/transport/amqp/options.go new file mode 100644 index 000000000..21b9b741f --- /dev/null +++ b/pkg/cloudevents/transport/amqp/options.go @@ -0,0 +1,51 @@ +package amqp + +import "pack.ag/amqp" + +// Option is the function signature required to be considered an amqp.Option. +type Option func(*Transport) error + +// WithEncoding sets the encoding for amqp transport. +func WithEncoding(encoding Encoding) Option { + return func(t *Transport) error { + t.Encoding = encoding + return nil + } +} + +// WithConnOpt sets a connection option for amqp +func WithConnOpt(opt amqp.ConnOption) Option { + return func(t *Transport) error { + t.connOpts = append(t.connOpts, opt) + return nil + } +} + +// WithConnSASLPlain sets SASLPlain connection option for amqp +func WithConnSASLPlain(username, password string) Option { + return WithConnOpt(amqp.ConnSASLPlain(username, password)) +} + +// WithSessionOpt sets a session option for amqp +func WithSessionOpt(opt amqp.SessionOption) Option { + return func(t *Transport) error { + t.sessionOpts = append(t.sessionOpts, opt) + return nil + } +} + +// WithSenderLinkOption sets a link option for amqp +func WithSenderLinkOption(opt amqp.LinkOption) Option { + return func(t *Transport) error { + t.senderLinkOpts = append(t.senderLinkOpts, opt) + return nil + } +} + +// WithReceiverLinkOption sets a link option for amqp +func WithReceiverLinkOption(opt amqp.LinkOption) Option { + return func(t *Transport) error { + t.receiverLinkOpts = append(t.receiverLinkOpts, opt) + return nil + } +} diff --git a/pkg/cloudevents/transport/amqp/options_test.go b/pkg/cloudevents/transport/amqp/options_test.go new file mode 100644 index 000000000..60573bb02 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/options_test.go @@ -0,0 +1,48 @@ +package amqp + +import ( + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "testing" +) + +func TestWithEncoding(t *testing.T) { + testCases := map[string]struct { + t *Transport + encoding Encoding + want *Transport + wantErr string + }{ + "valid encoding": { + t: &Transport{}, + encoding: StructuredV03, + want: &Transport{ + Encoding: StructuredV03, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + err := tc.t.applyOptions(WithEncoding(tc.encoding)) + + if tc.wantErr != "" || err != nil { + var gotErr string + if err != nil { + gotErr = err.Error() + } + if diff := cmp.Diff(tc.wantErr, gotErr); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + got := tc.t + + if diff := cmp.Diff(tc.want, got, + cmpopts.IgnoreUnexported(Transport{})); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/cloudevents/transport/amqp/transport.go b/pkg/cloudevents/transport/amqp/transport.go new file mode 100644 index 000000000..313e38c79 --- /dev/null +++ b/pkg/cloudevents/transport/amqp/transport.go @@ -0,0 +1,184 @@ +package amqp + +import ( + "context" + "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" + "go.uber.org/zap" + "pack.ag/amqp" +) + +// Transport adheres to transport.Transport. +var _ transport.Transport = (*Transport)(nil) + +// Transport acts as both a http client and a http handler. +type Transport struct { + connOpts []amqp.ConnOption + sessionOpts []amqp.SessionOption + senderLinkOpts []amqp.LinkOption + receiverLinkOpts []amqp.LinkOption + + // Encoding + Encoding Encoding + codec transport.Codec + + // AMQP + Client *amqp.Client + Session *amqp.Session + Sender *amqp.Sender + + Queue string + + // Receiver + Receiver transport.Receiver +} + +// New creates a new amqp transport. +func New(server, queue string, opts ...Option) (*Transport, error) { + t := &Transport{ + Queue: queue, + connOpts: []amqp.ConnOption(nil), + sessionOpts: []amqp.SessionOption(nil), + senderLinkOpts: []amqp.LinkOption(nil), + receiverLinkOpts: []amqp.LinkOption(nil), + } + if err := t.applyOptions(opts...); err != nil { + return nil, err + } + + client, err := amqp.Dial(server, t.connOpts...) + if err != nil { + return nil, err + } + t.Client = client + + // Open a session + session, err := client.NewSession(t.sessionOpts...) + if err != nil { + _ = client.Close() + return nil, err + } + t.Session = session + + t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(queue)) + + // Create a sender + sender, err := session.NewSender(t.senderLinkOpts...) + if err != nil { + _ = client.Close() + _ = session.Close(context.Background()) + return nil, err + } + t.Sender = sender // TODO: in the future we might have more than one sender. + + return t, nil +} + +func (t *Transport) applyOptions(opts ...Option) error { + for _, fn := range opts { + if err := fn(t); err != nil { + return err + } + } + return nil +} + +func (t *Transport) loadCodec() bool { + if t.codec == nil { + switch t.Encoding { + case Default, BinaryV02, StructuredV02, BinaryV03, StructuredV03: + t.codec = &Codec{Encoding: t.Encoding} + default: + return false + } + } + return true +} + +// Send implements Transport.Send +func (t *Transport) Send(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) { + if ok := t.loadCodec(); !ok { + return nil, fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) + } + + msg, err := t.codec.Encode(event) + if err != nil { + return nil, err + } + + if m, ok := msg.(*Message); ok { + // TODO: no response? + return nil, t.Sender.Send(ctx, &amqp.Message{ + Properties: &amqp.MessageProperties{ + ContentType: m.ContentType, + }, + ApplicationProperties: m.ApplicationProperties, + Data: [][]byte{m.Body}, + }) + } + + return nil, fmt.Errorf("failed to encode Event into a Message") +} + +// SetReceiver implements Transport.SetReceiver +func (t *Transport) SetReceiver(r transport.Receiver) { + t.Receiver = r +} + +// StartReceiver implements Transport.StartReceiver +// NOTE: This is a blocking call. +func (t *Transport) StartReceiver(ctx context.Context) error { + logger := cecontext.LoggerFrom(ctx) + + logger.Info("StartReceiver on ", t.Queue) + + t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(t.Queue)) + receiver, err := t.Session.NewReceiver(t.receiverLinkOpts...) + if err != nil { + return err + } + + if ok := t.loadCodec(); !ok { + return fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) + } + + ctx, cancel := context.WithCancel(ctx) + go func() { + for { + // Receive next message + msg, err := receiver.Receive(ctx) + if err != nil { + logger.Errorw("Failed reading message from AMQP.", zap.Error(err)) + panic(err) // TODO: panic? + } + + m := &Message{ + Body: msg.Data[0], // TODO: omg why is it a list of lists? + ContentType: msg.Properties.ContentType, + ApplicationProperties: msg.ApplicationProperties, + } + event, err := t.codec.Decode(m) + if err != nil { + logger.Errorw("failed to decode message", zap.Error(err)) // TODO: create an error channel to pass this up + } else { + // TODO: I do not know enough about amqp to implement reply. + // For now, amqp does not support reply. + if err := t.Receiver.Receive(context.TODO(), *event, nil); err != nil { + logger.Warnw("amqp receiver return err", zap.Error(err)) + } else { + if err := msg.Accept(); err != nil { + logger.Warnw("amqp accept return err", zap.Error(err)) + } + } + } + } + }() + + <-ctx.Done() + + _ = receiver.Close(ctx) + cancel() + return nil +} diff --git a/pkg/cloudevents/transport/http/codec.go b/pkg/cloudevents/transport/http/codec.go index 4ef207ccb..513c923f6 100644 --- a/pkg/cloudevents/transport/http/codec.go +++ b/pkg/cloudevents/transport/http/codec.go @@ -3,7 +3,6 @@ package http import ( "fmt" "github.com/cloudevents/sdk-go/pkg/cloudevents" - "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" ) @@ -209,18 +208,3 @@ func (c *Codec) inspectEncoding(msg transport.Message) Encoding { // We do not understand the message encoding. return Unknown } - -// --------- -// TODO: Should move these somewhere else. the methods are shared for all versions. - -func marshalEventData(encoding string, data interface{}) ([]byte, error) { - if data == nil { - return []byte(nil), nil - } - // already encoded? - if b, ok := data.([]byte); ok { - return b, nil - } - - return datacodec.Encode(encoding, data) -}