From 17adf5c507f5d32d6dde61a41549c34edab0881f Mon Sep 17 00:00:00 2001 From: Kamil Date: Mon, 20 Feb 2023 12:26:53 +0100 Subject: [PATCH 1/9] Add rabbitmq sender --- go.mod | 1 + go.sum | 12 ++++ sender/rabbitmq.go | 128 ++++++++++++++++++++++++++++++++++++++++ sender/rabbitmq_test.go | 64 ++++++++++++++++++++ 4 files changed, 205 insertions(+) create mode 100644 sender/rabbitmq.go create mode 100644 sender/rabbitmq_test.go diff --git a/go.mod b/go.mod index 96c2dfca..3924c67f 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/nats-io/nats.go v1.18.0 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/rabbitmq/amqp091-go v1.7.0 // indirect golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect ) diff --git a/go.sum b/go.sum index 11c74fee..f69b9156 100644 --- a/go.sum +++ b/go.sum @@ -208,6 +208,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= +github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/segmentio/kafka-go v0.4.32 h1:Ohr+9E+kDv/Ld2UPJN9hnKZRd2qgiqCmI8v2e1qlfLM= github.com/segmentio/kafka-go v0.4.32/go.mod h1:JAPPIiY3MQIwVHj64CWOP0LsFFfQ7H0w69kuoxnMIS0= @@ -218,6 +220,7 @@ github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0 github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -225,6 +228,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= @@ -233,11 +238,13 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -278,6 +285,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -309,6 +317,7 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -368,7 +377,9 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -431,6 +442,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/sender/rabbitmq.go b/sender/rabbitmq.go new file mode 100644 index 00000000..75545b60 --- /dev/null +++ b/sender/rabbitmq.go @@ -0,0 +1,128 @@ +/* + * skogul, kafka producer/sender + * + * Copyright (c) 2023 Telenor Norge AS + * Author(s): + * - Kamil Oracz + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +package sender + +import ( + "context" + "fmt" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/telenornms/skogul" + "github.com/telenornms/skogul/encoder" +) + +type Rabbitmq struct { + Username string `doc:"Username for rabbitmq instance"` + Password string `doc:"Password for rabbitmq instance"` + Host string `doc:"Hostname for rabbitmq instance. Fallback is localhost"` + Port string `doc:"Port for rabbitmq instance. Fallback is 5672"` + Queue string `doc:"Queue to write to"` + Encoder skogul.EncoderRef `doc:"Encoder to use. Fallback is json"` + Timeout int `doc:"Timeout for rabbitmq instance connection. Fallback is 10 seconds."` + channel *amqp.Channel + once sync.Once +} + +func (r *Rabbitmq) init() { + if r.Username == "" || r.Password == "" { + fmt.Print("Error missing username or password") + } + + if r.Port == "" { + r.Port = "5672" + } + + if r.Host == "" { + r.Host = "localhost" + } + + if r.Timeout == 0 { + r.Timeout = 10 + } + + if r.Encoder.E == nil { + r.Encoder.E = encoder.JSON{} + } + + conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username, r.Password, r.Host, r.Port)) + if err != nil { + fmt.Errorf("Failed initializing broker connection: %v", err) + } + + ch, err := conn.Channel() + if err != nil { + fmt.Errorf("Error %v", err) + } + + r.channel = ch + + _, err = ch.QueueDeclare( + r.Queue, + false, + false, + false, + false, + nil, + ) + + if err != nil { + fmt.Errorf("Error %v", err) + } +} + +func (r *Rabbitmq) Send(c *skogul.Container) error { + r.once.Do(func() { + r.init() + }) + + if r.channel == nil { + return fmt.Errorf("No active rabbitmq connections") + } + + defer r.channel.Close() + + body, err := r.Encoder.E.Encode(c) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)*time.Second) + defer cancel() + + err = r.channel.PublishWithContext( + ctx, + "", + r.Queue, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: body, + }, + ) + + return err +} diff --git a/sender/rabbitmq_test.go b/sender/rabbitmq_test.go new file mode 100644 index 00000000..cae694f1 --- /dev/null +++ b/sender/rabbitmq_test.go @@ -0,0 +1,64 @@ +/* + * skogul, kafka producer/sender + * + * Copyright (c) 2023 Telenor Norge AS + * Author(s): + * - Kamil Oracz + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +package sender + +import ( + "testing" + + "github.com/telenornms/skogul" +) + +func createContainer() *skogul.Container { + meta := make(map[string]interface{}) + meta["foo"] = "bar" + data := make(map[string]interface{}) + data["baz"] = "qux" + + metric := skogul.Metric{ + Metadata: meta, + Data: data, + } + metrics := make([]*skogul.Metric, 0) + metrics = append(metrics, &metric) + + return &skogul.Container{ + Metrics: metrics, + } +} + +func TestRabbitmq(t *testing.T) { + data := createContainer() + + r := Rabbitmq{ + Username: "guest", + Password: "guest", + Queue: "test-queue", + } + + err := r.Send(data) + + if err != nil { + t.Error(err) + } +} From 8d0100eaaf1b9e4e5eaa1bec62f152c31668c325 Mon Sep 17 00:00:00 2001 From: Kamil Date: Mon, 20 Feb 2023 12:55:25 +0100 Subject: [PATCH 2/9] Add test that sends a lot of messages --- sender/rabbitmq.go | 11 ++++++++--- sender/rabbitmq_test.go | 20 ++++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/sender/rabbitmq.go b/sender/rabbitmq.go index 75545b60..76aec6d0 100644 --- a/sender/rabbitmq.go +++ b/sender/rabbitmq.go @@ -90,6 +90,7 @@ func (r *Rabbitmq) init() { if err != nil { fmt.Errorf("Error %v", err) + r.channel.Close() } } @@ -102,10 +103,9 @@ func (r *Rabbitmq) Send(c *skogul.Container) error { return fmt.Errorf("No active rabbitmq connections") } - defer r.channel.Close() - body, err := r.Encoder.E.Encode(c) if err != nil { + r.channel.Close() return err } @@ -124,5 +124,10 @@ func (r *Rabbitmq) Send(c *skogul.Container) error { }, ) - return err + if err != nil { + r.channel.Close() + return err + } + + return nil } diff --git a/sender/rabbitmq_test.go b/sender/rabbitmq_test.go index cae694f1..27091ec2 100644 --- a/sender/rabbitmq_test.go +++ b/sender/rabbitmq_test.go @@ -62,3 +62,23 @@ func TestRabbitmq(t *testing.T) { t.Error(err) } } + +func TestRabbitmqTonsOfMessages(t *testing.T) { + data := createContainer() + + r := Rabbitmq{ + Username: "guest", + Password: "guest", + Queue: "test-queue", + } + + i := 0 + for i < 100000 { + err := r.Send(data) + + if err != nil { + t.Error(err) + } + i++ + } +} From 586836d49d8cc3a8acbf3596cb5aa782b288fe84 Mon Sep 17 00:00:00 2001 From: Kamil Date: Wed, 22 Feb 2023 10:10:00 +0100 Subject: [PATCH 3/9] Add rabbitmq sender and receiver --- receiver/auto.go | 5 ++ receiver/rabbitmq.go | 113 ++++++++++++++++++++++++++++++++++++++ receiver/rabbitmq_test.go | 75 +++++++++++++++++++++++++ sender/auto.go | 6 +- sender/rabbitmq.go | 3 +- sender/rabbitmq_test.go | 4 +- 6 files changed, 202 insertions(+), 4 deletions(-) create mode 100644 receiver/rabbitmq.go create mode 100644 receiver/rabbitmq_test.go diff --git a/receiver/auto.go b/receiver/auto.go index 3627423d..2f59f1a2 100644 --- a/receiver/auto.go +++ b/receiver/auto.go @@ -115,4 +115,9 @@ func init() { Alloc: func() interface{} { return &Kafka{} }, Help: "Connect to a Kafka topic and consume messages.", }) + Auto.Add(skogul.Module{ + Name: "rabbitmq", + Alloc: func() interface{} { return &Rabbitmq{} }, + Help: "Connect to a Rabbitmq topic and consume messages.", + }) } diff --git a/receiver/rabbitmq.go b/receiver/rabbitmq.go new file mode 100644 index 00000000..077298f4 --- /dev/null +++ b/receiver/rabbitmq.go @@ -0,0 +1,113 @@ +/* + * skogul, rabbitmq-receiver + * + * Copyright (c) 2023 Telenor Norge AS + * Author(s): + * - Kamil Oracz + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +package receiver + +import ( + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/telenornms/skogul" +) + +type Rabbitmq struct { + Username string `doc:"Username for rabbitmq instance"` + Password string `doc:"Password for rabbitmq instance"` + Host string `doc:"Hostname for rabbitmq instance. Fallback is localhost"` + Port string `doc:"Port for rabbitmq instance. Fallback is 5672"` + Queue string `doc:"Queue to read from"` + Handler *skogul.HandlerRef `doc:"Handler used to parse, transform and send data. Default skogul."` +} + +func (r *Rabbitmq) Start() error { + if r.Username == "" || r.Password == "" { + fmt.Print("Error missing username or password") + } + + if r.Port == "" { + r.Port = "5672" + } + + if r.Host == "" { + r.Host = "localhost" + } + + if r.Handler == nil { + r.Handler = &skogul.HandlerRef{} + } + + conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username, r.Password, r.Host, r.Port)) + if err != nil { + return err + } + + ch, err := conn.Channel() + + if err != nil { + return err + } + + _, err = ch.QueueDeclare( + r.Queue, + false, + false, + false, + false, + nil, + ) + + if err != nil { + return err + } + + msgs, err := ch.Consume( + r.Queue, + "", + true, + false, + false, + false, + nil, + ) + + if err != nil { + return err + } + + for message := range msgs { + container, err := r.Handler.H.Parse(message.Body) + + if err != nil { + // fmt.Errorf("Error failed to parse body %v", err) + return err + } + + err = r.Handler.H.TransformAndSend(container) + if err != nil { + // fmt.Errorf("Error transforming %v", err) + return err + } + } + + return nil +} diff --git a/receiver/rabbitmq_test.go b/receiver/rabbitmq_test.go new file mode 100644 index 00000000..07c04e05 --- /dev/null +++ b/receiver/rabbitmq_test.go @@ -0,0 +1,75 @@ +/* + * skogul, rabbitmq-receiver test + * + * Copyright (c) 2023 Telenor Norge AS + * Author(s): + * - Kamil Oracz + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +package receiver + +import ( + "fmt" + "testing" + + "github.com/telenornms/skogul/config" +) + +func TestRabbitmq(t *testing.T) { + sconf := fmt.Sprintf(` + { + "receivers": { + "x": { + "type": "rabbitmq", + "handler": "kek", + "username":"guest", + "password":"guest", + "queue":"test-queue" + } + }, + "handlers": { + "kek": { + "parser": "skogulmetric", + "transformers": [ + "now" + ], + "sender": "test" + } + }, + "senders": { + "test": { + "type": "test" + } + } + }`) + + conf, err := config.Bytes([]byte(sconf)) + + if err != nil { + t.Errorf("Failed to load config: %v", err) + return + } + + rcv := conf.Receivers["x"].Receiver.(*Rabbitmq) + + err = rcv.Start() + + if err != nil { + t.Error(err) + } +} diff --git a/sender/auto.go b/sender/auto.go index 7dca0bf3..2e30ca29 100644 --- a/sender/auto.go +++ b/sender/auto.go @@ -177,5 +177,9 @@ func init() { Alloc: func() interface{} { return &Kafka{} }, Help: "EXPERIMENTAL Kafka sender", }) - + Auto.Add(skogul.Module{ + Name: "rabbitmq", + Alloc: func() interface{} { return &Rabbitmq{} }, + Help: "Rabbitmq sender", + }) } diff --git a/sender/rabbitmq.go b/sender/rabbitmq.go index 76aec6d0..ddfea05f 100644 --- a/sender/rabbitmq.go +++ b/sender/rabbitmq.go @@ -1,5 +1,5 @@ /* - * skogul, kafka producer/sender + * skogul, rabbitmq producer/sender * * Copyright (c) 2023 Telenor Norge AS * Author(s): @@ -90,7 +90,6 @@ func (r *Rabbitmq) init() { if err != nil { fmt.Errorf("Error %v", err) - r.channel.Close() } } diff --git a/sender/rabbitmq_test.go b/sender/rabbitmq_test.go index 27091ec2..eac9e074 100644 --- a/sender/rabbitmq_test.go +++ b/sender/rabbitmq_test.go @@ -1,5 +1,5 @@ /* - * skogul, kafka producer/sender + * skogul, rabbitmq producer/sender test * * Copyright (c) 2023 Telenor Norge AS * Author(s): @@ -25,6 +25,7 @@ package sender import ( "testing" + "time" "github.com/telenornms/skogul" ) @@ -36,6 +37,7 @@ func createContainer() *skogul.Container { data["baz"] = "qux" metric := skogul.Metric{ + Time: &time.Time{}, Metadata: meta, Data: data, } From 9d9d3cd6d76131a0161edd6dec5d7cb67eb68c7f Mon Sep 17 00:00:00 2001 From: Kamil Date: Thu, 23 Feb 2023 10:11:07 +0100 Subject: [PATCH 4/9] Improve error handling --- receiver/rabbitmq.go | 4 +--- sender/rabbitmq.go | 6 +++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/receiver/rabbitmq.go b/receiver/rabbitmq.go index 077298f4..6e02b060 100644 --- a/receiver/rabbitmq.go +++ b/receiver/rabbitmq.go @@ -41,7 +41,7 @@ type Rabbitmq struct { func (r *Rabbitmq) Start() error { if r.Username == "" || r.Password == "" { - fmt.Print("Error missing username or password") + return fmt.Errorf("Error missing username or password") } if r.Port == "" { @@ -98,13 +98,11 @@ func (r *Rabbitmq) Start() error { container, err := r.Handler.H.Parse(message.Body) if err != nil { - // fmt.Errorf("Error failed to parse body %v", err) return err } err = r.Handler.H.TransformAndSend(container) if err != nil { - // fmt.Errorf("Error transforming %v", err) return err } } diff --git a/sender/rabbitmq.go b/sender/rabbitmq.go index ddfea05f..4da2ea81 100644 --- a/sender/rabbitmq.go +++ b/sender/rabbitmq.go @@ -48,7 +48,8 @@ type Rabbitmq struct { func (r *Rabbitmq) init() { if r.Username == "" || r.Password == "" { - fmt.Print("Error missing username or password") + fmt.Errorf("Error missing username or password") + return } if r.Port == "" { @@ -70,11 +71,13 @@ func (r *Rabbitmq) init() { conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username, r.Password, r.Host, r.Port)) if err != nil { fmt.Errorf("Failed initializing broker connection: %v", err) + return } ch, err := conn.Channel() if err != nil { fmt.Errorf("Error %v", err) + return } r.channel = ch @@ -90,6 +93,7 @@ func (r *Rabbitmq) init() { if err != nil { fmt.Errorf("Error %v", err) + return } } From 61b68b237a393d78f4c40d10a3e4975959b62717 Mon Sep 17 00:00:00 2001 From: Kamil Date: Fri, 24 Feb 2023 14:01:26 +0100 Subject: [PATCH 5/9] Disable tests making trouble --- receiver/rabbitmq_test.go | 83 ++++++++++++++++++--------------------- sender/rabbitmq_test.go | 32 +++++++-------- 2 files changed, 54 insertions(+), 61 deletions(-) diff --git a/receiver/rabbitmq_test.go b/receiver/rabbitmq_test.go index 07c04e05..e45346b9 100644 --- a/receiver/rabbitmq_test.go +++ b/receiver/rabbitmq_test.go @@ -23,53 +23,46 @@ package receiver -import ( - "fmt" - "testing" +// func TestRabbitmq(t *testing.T) { +// sconf := fmt.Sprintf(` +// { +// "receivers": { +// "x": { +// "type": "rabbitmq", +// "handler": "kek", +// "username":"guest", +// "password":"guest", +// "queue":"test-queue" +// } +// }, +// "handlers": { +// "kek": { +// "parser": "skogulmetric", +// "transformers": [ +// "now" +// ], +// "sender": "test" +// } +// }, +// "senders": { +// "test": { +// "type": "test" +// } +// } +// }`) - "github.com/telenornms/skogul/config" -) +// conf, err := config.Bytes([]byte(sconf)) -func TestRabbitmq(t *testing.T) { - sconf := fmt.Sprintf(` - { - "receivers": { - "x": { - "type": "rabbitmq", - "handler": "kek", - "username":"guest", - "password":"guest", - "queue":"test-queue" - } - }, - "handlers": { - "kek": { - "parser": "skogulmetric", - "transformers": [ - "now" - ], - "sender": "test" - } - }, - "senders": { - "test": { - "type": "test" - } - } - }`) +// if err != nil { +// t.Errorf("Failed to load config: %v", err) +// return +// } - conf, err := config.Bytes([]byte(sconf)) +// rcv := conf.Receivers["x"].Receiver.(*receiver.Rabbitmq) - if err != nil { - t.Errorf("Failed to load config: %v", err) - return - } +// err = rcv.Start() - rcv := conf.Receivers["x"].Receiver.(*Rabbitmq) - - err = rcv.Start() - - if err != nil { - t.Error(err) - } -} +// if err != nil { +// t.Error(err) +// } +// } diff --git a/sender/rabbitmq_test.go b/sender/rabbitmq_test.go index eac9e074..5cbdb142 100644 --- a/sender/rabbitmq_test.go +++ b/sender/rabbitmq_test.go @@ -65,22 +65,22 @@ func TestRabbitmq(t *testing.T) { } } -func TestRabbitmqTonsOfMessages(t *testing.T) { - data := createContainer() +// func TestRabbitmqTonsOfMessages(t *testing.T) { +// data := createContainer() - r := Rabbitmq{ - Username: "guest", - Password: "guest", - Queue: "test-queue", - } +// r := Rabbitmq{ +// Username: "guest", +// Password: "guest", +// Queue: "test-queue", +// } - i := 0 - for i < 100000 { - err := r.Send(data) +// i := 0 +// for i < 100000 { +// err := r.Send(data) - if err != nil { - t.Error(err) - } - i++ - } -} +// if err != nil { +// t.Error(err) +// } +// i++ +// } +// } From ac67e619b791788be15851df67f1dc05456d382a Mon Sep 17 00:00:00 2001 From: Kamil Date: Mon, 27 Feb 2023 14:19:48 +0100 Subject: [PATCH 6/9] Improve receiver --- receiver/rabbitmq.go | 34 ++++++++++----- receiver/rabbitmq_test.go | 87 ++++++++++++++++++++++----------------- 2 files changed, 72 insertions(+), 49 deletions(-) diff --git a/receiver/rabbitmq.go b/receiver/rabbitmq.go index 6e02b060..2c43d97f 100644 --- a/receiver/rabbitmq.go +++ b/receiver/rabbitmq.go @@ -31,8 +31,8 @@ import ( ) type Rabbitmq struct { - Username string `doc:"Username for rabbitmq instance"` - Password string `doc:"Password for rabbitmq instance"` + Username skogul.Secret `doc:"Username for rabbitmq instance"` + Password skogul.Secret `doc:"Password for rabbitmq instance"` Host string `doc:"Hostname for rabbitmq instance. Fallback is localhost"` Port string `doc:"Port for rabbitmq instance. Fallback is 5672"` Queue string `doc:"Queue to read from"` @@ -40,10 +40,6 @@ type Rabbitmq struct { } func (r *Rabbitmq) Start() error { - if r.Username == "" || r.Password == "" { - return fmt.Errorf("Error missing username or password") - } - if r.Port == "" { r.Port = "5672" } @@ -52,11 +48,7 @@ func (r *Rabbitmq) Start() error { r.Host = "localhost" } - if r.Handler == nil { - r.Handler = &skogul.HandlerRef{} - } - - conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username, r.Password, r.Host, r.Port)) + conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username.Expose(), r.Password.Expose(), r.Host, r.Port)) if err != nil { return err } @@ -109,3 +101,23 @@ func (r *Rabbitmq) Start() error { return nil } + +func (r *Rabbitmq) Verify() error { + if r.Handler.Name == "" { + return skogul.MissingArgument("Handler") + } + + if r.Username.Expose() == "" { + return skogul.MissingArgument("Username") + } + + if r.Password.Expose() == "" { + return skogul.MissingArgument("Password") + } + + if r.Queue == "" { + return skogul.MissingArgument("Queue") + } + + return nil +} diff --git a/receiver/rabbitmq_test.go b/receiver/rabbitmq_test.go index e45346b9..8784f985 100644 --- a/receiver/rabbitmq_test.go +++ b/receiver/rabbitmq_test.go @@ -23,46 +23,57 @@ package receiver -// func TestRabbitmq(t *testing.T) { -// sconf := fmt.Sprintf(` -// { -// "receivers": { -// "x": { -// "type": "rabbitmq", -// "handler": "kek", -// "username":"guest", -// "password":"guest", -// "queue":"test-queue" -// } -// }, -// "handlers": { -// "kek": { -// "parser": "skogulmetric", -// "transformers": [ -// "now" -// ], -// "sender": "test" -// } -// }, -// "senders": { -// "test": { -// "type": "test" -// } -// } -// }`) +import ( + "fmt" + "testing" -// conf, err := config.Bytes([]byte(sconf)) + "github.com/telenornms/skogul/config" +) -// if err != nil { -// t.Errorf("Failed to load config: %v", err) -// return -// } +func TestRabbitmq(t *testing.T) { + if testing.Short() { + t.Skip("Short test: Not connecting to a Rabbitmq instance") + } -// rcv := conf.Receivers["x"].Receiver.(*receiver.Rabbitmq) + sconf := fmt.Sprintf(` + { + "receivers": { + "x": { + "type": "rabbitmq", + "handler": "kek", + "username":"guest", + "password":"guest", + "queue":"test-queue" + } + }, + "handlers": { + "kek": { + "parser": "skogulmetric", + "transformers": [ + "now" + ], + "sender": "test" + } + }, + "senders": { + "test": { + "type": "test" + } + } + }`) -// err = rcv.Start() + conf, err := config.Bytes([]byte(sconf)) -// if err != nil { -// t.Error(err) -// } -// } + if err != nil { + t.Errorf("Failed to load config: %v", err) + return + } + + rcv := conf.Receivers["x"].Receiver.(*receiver.Rabbitmq) + + err = rcv.Start() + + if err != nil { + t.Error(err) + } +} From 918133db7914bc101a2a69b89760601f14c706c2 Mon Sep 17 00:00:00 2001 From: Kamil Date: Mon, 27 Feb 2023 15:13:28 +0100 Subject: [PATCH 7/9] Improve sender --- sender/rabbitmq.go | 35 +++++++++++++++++++++---------- sender/rabbitmq_test.go | 46 ++++++++++++++++++++++++----------------- 2 files changed, 51 insertions(+), 30 deletions(-) diff --git a/sender/rabbitmq.go b/sender/rabbitmq.go index 4da2ea81..83f907a6 100644 --- a/sender/rabbitmq.go +++ b/sender/rabbitmq.go @@ -35,8 +35,8 @@ import ( ) type Rabbitmq struct { - Username string `doc:"Username for rabbitmq instance"` - Password string `doc:"Password for rabbitmq instance"` + Username skogul.Secret `doc:"Username for rabbitmq instance"` + Password skogul.Secret `doc:"Password for rabbitmq instance"` Host string `doc:"Hostname for rabbitmq instance. Fallback is localhost"` Port string `doc:"Port for rabbitmq instance. Fallback is 5672"` Queue string `doc:"Queue to write to"` @@ -46,12 +46,9 @@ type Rabbitmq struct { once sync.Once } -func (r *Rabbitmq) init() { - if r.Username == "" || r.Password == "" { - fmt.Errorf("Error missing username or password") - return - } +var log = skogul.Logger("sender", "rabbitmq") +func (r *Rabbitmq) init() { if r.Port == "" { r.Port = "5672" } @@ -68,15 +65,15 @@ func (r *Rabbitmq) init() { r.Encoder.E = encoder.JSON{} } - conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username, r.Password, r.Host, r.Port)) + conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username.Expose(), r.Password.Expose(), r.Host, r.Port)) if err != nil { - fmt.Errorf("Failed initializing broker connection: %v", err) + log.WithError(err).Error("Failed initializing broker connection") return } ch, err := conn.Channel() if err != nil { - fmt.Errorf("Error %v", err) + log.WithError(err).Error("Failed initializing channel") return } @@ -92,7 +89,7 @@ func (r *Rabbitmq) init() { ) if err != nil { - fmt.Errorf("Error %v", err) + log.WithError(err).Error("Failed to declare a queue") return } } @@ -134,3 +131,19 @@ func (r *Rabbitmq) Send(c *skogul.Container) error { return nil } + +func (r *Rabbitmq) Verify() error { + if r.Username.Expose() == "" { + return skogul.MissingArgument("Username") + } + + if r.Password.Expose() == "" { + return skogul.MissingArgument("Password") + } + + if r.Queue == "" { + return skogul.MissingArgument("Queue") + } + + return nil +} diff --git a/sender/rabbitmq_test.go b/sender/rabbitmq_test.go index 5cbdb142..ad937d58 100644 --- a/sender/rabbitmq_test.go +++ b/sender/rabbitmq_test.go @@ -50,6 +50,10 @@ func createContainer() *skogul.Container { } func TestRabbitmq(t *testing.T) { + if testing.Short() { + t.Skip("Short test: Not connecting to a Rabbitmq instance") + } + data := createContainer() r := Rabbitmq{ @@ -65,22 +69,26 @@ func TestRabbitmq(t *testing.T) { } } -// func TestRabbitmqTonsOfMessages(t *testing.T) { -// data := createContainer() - -// r := Rabbitmq{ -// Username: "guest", -// Password: "guest", -// Queue: "test-queue", -// } - -// i := 0 -// for i < 100000 { -// err := r.Send(data) - -// if err != nil { -// t.Error(err) -// } -// i++ -// } -// } +func TestRabbitmqTonsOfMessages(t *testing.T) { + if testing.Short() { + t.Skip("Short test: Not connecting to a Rabbitmq instance") + } + + data := createContainer() + + r := Rabbitmq{ + Username: "guest", + Password: "guest", + Queue: "test-queue", + } + + i := 0 + for i < 100000 { + err := r.Send(data) + + if err != nil { + t.Error(err) + } + i++ + } +} From 8c38d0802e229c5a58d05db4a02e377be69a01b3 Mon Sep 17 00:00:00 2001 From: Kamil Date: Mon, 27 Feb 2023 16:16:07 +0100 Subject: [PATCH 8/9] Fix variable name --- sender/rabbitmq.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sender/rabbitmq.go b/sender/rabbitmq.go index 83f907a6..77176704 100644 --- a/sender/rabbitmq.go +++ b/sender/rabbitmq.go @@ -46,7 +46,7 @@ type Rabbitmq struct { once sync.Once } -var log = skogul.Logger("sender", "rabbitmq") +var rabbitmqLog = skogul.Logger("sender", "rabbitmq") func (r *Rabbitmq) init() { if r.Port == "" { @@ -67,13 +67,13 @@ func (r *Rabbitmq) init() { conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username.Expose(), r.Password.Expose(), r.Host, r.Port)) if err != nil { - log.WithError(err).Error("Failed initializing broker connection") + rabbitmqLog.WithError(err).Error("Failed initializing broker connection") return } ch, err := conn.Channel() if err != nil { - log.WithError(err).Error("Failed initializing channel") + rabbitmqLog.WithError(err).Error("Failed initializing channel") return } @@ -89,7 +89,7 @@ func (r *Rabbitmq) init() { ) if err != nil { - log.WithError(err).Error("Failed to declare a queue") + rabbitmqLog.WithError(err).Error("Failed to declare a queue") return } } From 94e1d06b9eae69b0c8b04f4a434e9a53f4fadf1a Mon Sep 17 00:00:00 2001 From: Kamil Date: Mon, 6 Mar 2023 12:23:20 +0100 Subject: [PATCH 9/9] Fix import cycle --- receiver/rabbitmq_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/receiver/rabbitmq_test.go b/receiver/rabbitmq_test.go index 8784f985..6b3c5e22 100644 --- a/receiver/rabbitmq_test.go +++ b/receiver/rabbitmq_test.go @@ -21,13 +21,14 @@ * 02110-1301 USA */ -package receiver +package receiver_test import ( "fmt" "testing" "github.com/telenornms/skogul/config" + "github.com/telenornms/skogul/receiver" ) func TestRabbitmq(t *testing.T) {