From 4ffa97060af814fe6370e7ac2054a3ab72b1b375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20M=C3=A1gr?= Date: Fri, 17 Feb 2023 19:52:26 +0100 Subject: [PATCH] Improve AMQP-1.0 transport (#26) * AMQP-1.0 improvements - avoid flooding with links - enable reconnection - linter fixes --- connector/amqp10/amqp10.go | 500 +++++++++++++++++++++++++++---------- system/signals.go | 2 +- tests/connector_test.go | 86 +++++-- tests/iniconfig_test.go | 38 +-- tests/jsonconfig_test.go | 22 +- 5 files changed, 465 insertions(+), 183 deletions(-) diff --git a/connector/amqp10/amqp10.go b/connector/amqp10/amqp10.go index ffd3147..d120613 100644 --- a/connector/amqp10/amqp10.go +++ b/connector/amqp10/amqp10.go @@ -2,7 +2,9 @@ package amqp10 import ( "fmt" + "math/rand" "strings" + "sync" "time" "github.com/infrawatch/apputils/config" @@ -13,43 +15,75 @@ import ( ) const ( - defaultSendTimeout = 2 - defaultListenPrefetch = -1 - defaultClientName = "localhost" + defaultSendTimeout = 2 + defaultListenPrefetch = -1 + defaultClientName = "localhost" + defaultLinkFailureLimit = 20 ) -//AMQP10Receiver is tagged electron receiver +// AMQP10Receiver is tagged electron receiver type AMQP10Receiver struct { Receiver electron.Receiver Tags []string } -//AMQP10Connector is the object to be used for communication with AMQP-1.0 entity +// AMQP10Connector is the object to be used for communication with AMQP-1.0 entity type AMQP10Connector struct { - Address string - ClientName string - SendTimeout int64 - inConnection electron.Connection - outConnection electron.Connection - receivers []AMQP10Receiver - logger *logging.Logger + Address string + ClientName string + SendTimeout int64 + ListenPrefetch int64 + LinkFailureLimit int64 + appName string + inConnection electron.Connection + outConnection electron.Connection + receivers []AMQP10Receiver + senders map[string]electron.Sender + logger *logging.Logger + interrupt chan bool } -//AMQP10Message holds received (or to be sent) messages from (to) AMQP-1.0 entity +// AMQP10Message holds received (or to be sent) messages from (to) AMQP-1.0 entity type AMQP10Message struct { Address string Body string Tags []string } -//CreateAMQP10Connector creates the connector and connects to given AMQP1.0 service -func CreateAMQP10Connector(logger *logging.Logger, address string, clientName string, sendTimeout int64, listenPrefetch int64, listenChannels []string) (*AMQP10Connector, error) { +//-------------------------- constructors and their helpers -------------------------- + +func bindListenChannels(conn *AMQP10Connector, listen []string) error { + for _, channel := range listen { + if len(channel) < 1 { + continue + } + conn.logger.Metadata(logging.Metadata{ + "channel": channel, + "prefetch": conn.ListenPrefetch, + }) + conn.logger.Debug("Creating AMQP receiver for channel") + if err := conn.CreateReceiver(channel, int(conn.ListenPrefetch)); err != nil { + return fmt.Errorf("Failed to create receiver: %s", err) + } + } + return nil +} + +// CreateAMQP10Connector creates the connector and connects to given AMQP1.0 service +func CreateAMQP10Connector(logger *logging.Logger, address string, clientName string, + appName string, sendTimeout int64, linkFailureLimit int64, listenPrefetch int64, + listenChannels []string) (*AMQP10Connector, error) { connector := AMQP10Connector{ - Address: address, - ClientName: clientName, - SendTimeout: sendTimeout, - logger: logger, - receivers: make([]AMQP10Receiver, 0), + Address: address, + ClientName: clientName, + SendTimeout: sendTimeout, + ListenPrefetch: listenPrefetch, + LinkFailureLimit: linkFailureLimit, + appName: appName, + logger: logger, + receivers: make([]AMQP10Receiver, 0), + senders: make(map[string]electron.Sender), + interrupt: make(chan bool), } // connect @@ -57,24 +91,14 @@ func CreateAMQP10Connector(logger *logging.Logger, address string, clientName st return &connector, fmt.Errorf("Error while connecting to AMQP") } // bind to channels - for _, channel := range listenChannels { - if len(channel) < 1 { - continue - } - logger.Metadata(map[string]interface{}{ - "channel": channel, - "prefetch": listenPrefetch, - }) - logger.Debug("Creating AMQP receiver for channel") - if err := connector.CreateReceiver(channel, int(listenPrefetch)); err != nil { - return &connector, fmt.Errorf("Failed to create receiver: %s", err) - } + if err := bindListenChannels(&connector, listenChannels); err != nil { + return &connector, fmt.Errorf("Error while creating receivers") } return &connector, nil } -//ConnectAMQP10 creates new AMQP1.0 connector from the given configuration file -func ConnectAMQP10(cfg config.Config, logger *logging.Logger) (*AMQP10Connector, error) { +// ConnectAMQP10 creates new AMQP1.0 connector from the given configuration file +func ConnectAMQP10(appName string, cfg config.Config, logger *logging.Logger) (*AMQP10Connector, error) { var err error var opt *config.Option @@ -100,25 +124,30 @@ func ConnectAMQP10(cfg config.Config, logger *logging.Logger) (*AMQP10Connector, case *config.JSONConfig: opt, err = conf.GetOption("Amqp1.Connection.SendTimeout") } - if err != nil { - return nil, err - } sendTimeout := int64(defaultSendTimeout) - if opt != nil { + if opt != nil && err == nil { sendTimeout = opt.GetInt() } + switch conf := cfg.(type) { + case *config.INIConfig: + opt, err = conf.GetOption("amqp1/link_failure_limit") + case *config.JSONConfig: + opt, err = conf.GetOption("Amqp1.Connection.LinkFailureLimit") + } + linkLimit := int64(defaultLinkFailureLimit) + if opt != nil && err == nil { + linkLimit = opt.GetInt() + } + switch conf := cfg.(type) { case *config.INIConfig: opt, err = conf.GetOption("amqp1/client_name") case *config.JSONConfig: opt, err = conf.GetOption("Amqp1.Client.Name") } - if err != nil { - return nil, err - } clientName := defaultClientName - if opt != nil { + if opt != nil && err == nil { clientName = opt.GetString() } @@ -128,11 +157,8 @@ func ConnectAMQP10(cfg config.Config, logger *logging.Logger) (*AMQP10Connector, case *config.JSONConfig: opt, err = conf.GetOption("Amqp1.Connection.ListenChannels") } - if err != nil { - return nil, err - } listen := []string{} - if opt != nil { + if opt != nil && err == nil { listen = opt.GetStrings(",") } @@ -142,55 +168,71 @@ func ConnectAMQP10(cfg config.Config, logger *logging.Logger) (*AMQP10Connector, case *config.JSONConfig: opt, err = conf.GetOption("Amqp1.Connection.ListenPrefetch") } - if err != nil { - return nil, err - } prf := int64(defaultListenPrefetch) - if opt != nil { + if opt != nil && err == nil { prf = opt.GetInt() } - return CreateAMQP10Connector(logger, addr, clientName, sendTimeout, prf, listen) + return CreateAMQP10Connector(logger, addr, clientName, appName, sendTimeout, linkLimit, prf, listen) } -//Connect creates input and output connection to configured AMQP1.0 node -func (conn *AMQP10Connector) Connect() error { - url, err := amqp.ParseURL(conn.Address) +//---------------------------- connect helpers and method ---------------------------- + +func dial(address, containerName string) (*electron.Connection, error) { + url, err := amqp.ParseURL(address) + if err != nil { + return nil, fmt.Errorf("Error while parsing AMQP1.0 URL: %s", address) + } + + container := electron.NewContainer(containerName) + conn, err := container.Dial("tcp", url.Host) + if err != nil { + return nil, fmt.Errorf("AMQP dial TCP error: %s", err.Error()) + } + return &conn, err +} + +func (conn *AMQP10Connector) connect(connType string) error { + container := fmt.Sprintf("%s-%s-%s-%d", conn.ClientName, conn.appName, connType, time.Now().Unix()-int64(rand.Intn(10))) + c, err := dial(conn.Address, container) if err != nil { conn.logger.Metadata(map[string]interface{}{ - "error": err, - "connection": conn.Address, + "container": container, }) - conn.logger.Debug("Error while parsing AMQP1.0 URL") + conn.logger.Debug("Failed to create AMQP1.0 connection") return err } - inContainer := electron.NewContainer(fmt.Sprintf("%s-infrawatch-in-%d", conn.ClientName, time.Now().Unix())) - cin, err := inContainer.Dial("tcp", url.Host) - if err != nil { + switch connType { + case "in": + conn.inConnection = *c + case "out": + conn.outConnection = *c + } + return nil +} + +// Connect creates input and output connection container for given appname for configured AMQP1.0 node +func (conn *AMQP10Connector) Connect() error { + if err := conn.connect("in"); err != nil { conn.logger.Metadata(map[string]interface{}{ "error": err, }) - conn.logger.Debug("AMQP dial TCP error") + conn.logger.Debug("Failed to create incoming connection") return err } - conn.inConnection = cin - outContainer := electron.NewContainer(fmt.Sprintf("%s-infrawatch-out-%d", conn.ClientName, time.Now().Unix())) - cout, err := outContainer.Dial("tcp", url.Host) - if err != nil { + if err := conn.connect("out"); err != nil { conn.logger.Metadata(map[string]interface{}{ "error": err, }) - conn.logger.Debug("AMQP dial TCP error") + conn.logger.Debug("Failed to create ougoing connection") return err } - conn.outConnection = cout - return nil } -//CreateReceiver creates electron.Receiver for given address +// CreateReceiver creates electron.Receiver for given address func (conn *AMQP10Connector) CreateReceiver(address string, prefetch int) error { addr := strings.TrimPrefix(address, "/") parts := strings.Split(addr, ":") @@ -205,9 +247,6 @@ func (conn *AMQP10Connector) CreateReceiver(address string, prefetch int) error opts = append(opts, electron.Capacity(prefetch), electron.Prefetch(true)) } - if conn.inConnection == nil { - return fmt.Errorf("Connection to AMQP-1.0 node has to be created first.") - } if rcv, err := conn.inConnection.Receiver(opts...); err == nil { conn.receivers = append(conn.receivers, AMQP10Receiver{rcv, parts[1:]}) } else { @@ -221,22 +260,30 @@ func (conn *AMQP10Connector) CreateReceiver(address string, prefetch int) error return nil } -//Reconnect tries to reconnect connector to configured AMQP1.0 node -func (conn *AMQP10Connector) Reconnect() error { +// CreateSender creates electron.Sender for given address +func (conn *AMQP10Connector) CreateSender(address string) (*electron.Sender, error) { + channel := strings.TrimPrefix(address, "/") + if s, ok := conn.senders[channel]; ok { + s.Close(nil) + delete(conn.senders, channel) + } - return nil + var err error + var snd electron.Sender + opts := []electron.LinkOption{electron.Target(address), electron.AtMostOnce()} + if snd, err = conn.outConnection.Sender(opts...); err != nil { + conn.logger.Metadata(logging.Metadata{ + "address": fmt.Sprintf("%s/%s", conn.Address, channel), + "error": err, + }) + conn.logger.Warn("Failed to create sender for given address") + return nil, fmt.Errorf("Failed to create sender") + } + conn.senders[channel] = snd + return &snd, nil } -//Disconnect closes all connections -func (conn *AMQP10Connector) Disconnect() { - conn.inConnection.Close(nil) - conn.outConnection.Close(nil) - conn.logger.Metadata(map[string]interface{}{ - "incoming": conn.inConnection, - "outgoing": conn.outConnection, - }) - conn.logger.Debug("Closed connections") -} +//---------------------------- reconnect helpers and method --------------------------- func (conn *AMQP10Connector) processIncomingMessage(msg interface{}, outchan chan interface{}, receiver AMQP10Receiver) { message := AMQP10Message{Address: receiver.Receiver.Source(), Tags: receiver.Tags} @@ -261,13 +308,22 @@ func (conn *AMQP10Connector) processIncomingMessage(msg interface{}, outchan cha } } -//Start starts all processing loops. Channel outchan will contain received AMQP10Message from AMQP1.0 node -// and through inchan AMQP10Message are sent to configured AMQP1.0 node -func (conn *AMQP10Connector) Start(outchan chan interface{}, inchan chan interface{}) { - //create listening goroutine for each receiver +func (conn *AMQP10Connector) startReceivers(outchan chan interface{}, wg *sync.WaitGroup) { for _, rcv := range conn.receivers { + wg.Add(1) go func(receiver AMQP10Receiver) { + defer wg.Done() + conn.logger.Metadata(logging.Metadata{ + "connection": conn.Address, + "address": receiver.Receiver.Source(), + }) + conn.logger.Warn("Created receiver") for { + select { + case <-conn.interrupt: + goto doneReceive + default: + } if msg, err := receiver.Receiver.Receive(); err == nil { msg.Accept() conn.processIncomingMessage(msg.Message.Body(), outchan, receiver) @@ -278,8 +334,7 @@ func (conn *AMQP10Connector) Start(outchan chan interface{}, inchan chan interfa "address": receiver.Receiver.Source(), }) conn.logger.Warn("Channel closed, closing receiver loop") - //TODO: send message to (future) reconnect loop, where it Reconnect and Start again - return + goto doneReceive } else { conn.logger.Metadata(map[string]interface{}{ "connection": conn.Address, @@ -289,58 +344,245 @@ func (conn *AMQP10Connector) Start(outchan chan interface{}, inchan chan interfa conn.logger.Error("Received AMQP1.0 error") } } + + doneReceive: + conn.logger.Metadata(map[string]interface{}{ + "connection": conn.Address, + "address": receiver.Receiver.Source(), + }) + conn.logger.Error("Shutting down receiver") }(rcv) } +} - //create sending goroutine - go func() { - for msg := range inchan { - switch message := msg.(type) { - case AMQP10Message: - sender, err := conn.outConnection.Sender(electron.Target(message.Address)) - if err != nil { +// Reconnect tries to reconnect connector to configured AMQP1.0 node. Returns nil if failed +func (conn *AMQP10Connector) Reconnect(connectionType string, outchan chan interface{}, wg *sync.WaitGroup) error { + listen := []string{} + switch connectionType { + case "in": + for r := range conn.receivers { + // get receiver data + tags := strings.Join(conn.receivers[r].Tags, ":") + address := conn.receivers[r].Receiver.Source() + if len(tags) > 0 { + address = strings.Join([]string{address, tags}, ":") + } + listen = append(listen, address) + // close receiver + conn.receivers[r].Receiver.Close(nil) + for { + if err := conn.receivers[r].Receiver.Sync(); err == electron.Closed { + break + } else { conn.logger.Metadata(map[string]interface{}{ - "connection": conn.Address, - "message": message, - "error": err, + "receiver": conn.receivers[r].Receiver, }) - conn.logger.Warn("Failed to create AMQP1.0 sender on given connection, skipping processing message") - continue + conn.logger.Debug("Waiting for receiver to be closed") + time.Sleep(time.Millisecond) } - conn.logger.Metadata(map[string]interface{}{ - "address": message.Address, - "body": message.Body, - }) - conn.logger.Debug("Sending AMQP1.0 message") + } + conn.logger.Metadata(map[string]interface{}{ + "receiver": conn.receivers[r].Receiver, + }) + conn.logger.Debug("Closed receiver link") + } + conn.receivers = []AMQP10Receiver{} + + conn.inConnection.Disconnect(fmt.Errorf("Reconnecting")) + conn.inConnection.Wait() + conn.logger.Debug("Disconnected incoming connection") + case "out": + for s := range conn.senders { + conn.senders[s].Close(fmt.Errorf("Reconnecting")) + delete(conn.senders, s) + conn.logger.Metadata(map[string]interface{}{ + "sender": conn.senders[s], + }) + conn.logger.Debug("Closed sender link") + } + conn.outConnection.Disconnect(fmt.Errorf("Reconnecting")) + conn.outConnection.Wait() + conn.logger.Debug("Disconnected outgoing connection") + default: + return fmt.Errorf("Wrong connection type. Should be 'in' or 'out'") + } - m := amqp.NewMessage() - m.SetContentType("application/json") - m.Marshal(message.Body) + if err := conn.connect(connectionType); err != nil { + conn.logger.Metadata(map[string]interface{}{ + "connection": connectionType, + "error": err, + }) + conn.logger.Error("Failed to reconnect") + return err + } - ackChan := sender.SendWaitable(m) - timer := time.NewTimer(time.Duration(conn.SendTimeout) * time.Second) + if connectionType == "in" { + if err := bindListenChannels(conn, listen); err != nil { + return fmt.Errorf("Error while creating receiver links") + } + conn.logger.Metadata(map[string]interface{}{ + "receivers": conn.receivers, + }) + conn.logger.Debug("Recreated receiver links") + // recreate receiving loops + conn.startReceivers(outchan, wg) + } + return nil +} - select { - case ack := <-ackChan: - if ack.Status != 2 { - conn.logger.Metadata(map[string]interface{}{ - "message": m, - "ack": ack, +// Disconnect closes connection in both directions +func (conn *AMQP10Connector) Disconnect() { + close(conn.interrupt) + time.Sleep(time.Second) + conn.inConnection.Disconnect(nil) + conn.outConnection.Disconnect(nil) + conn.logger.Metadata(map[string]interface{}{ + "incoming": conn.inConnection, + "outgoing": conn.outConnection, + }) + conn.logger.Debug("Closed connections") +} + +//---------------------------- message processing initiation method ---------------------------- + +// Start starts all processing loops. Channel outchan will contain received AMQP10Message from AMQP1.0 node +// and through inchan AMQP10Message are sent to configured AMQP1.0 node +func (conn *AMQP10Connector) Start(outchan chan interface{}, inchan chan interface{}) *sync.WaitGroup { + wg := sync.WaitGroup{} + + //create listening goroutine for each receiver + conn.startReceivers(outchan, &wg) + + ackchan := make(chan electron.Outcome) + linkFail := int64(0) + lfLock := sync.RWMutex{} + + // ACK and error verification goroutine + wg.Add(1) + go func(ackchan chan electron.Outcome) { + defer wg.Done() + for { + select { + case ack := <-ackchan: + if ack.Error != nil { + lfLock.Lock() + linkFail += 1 + lfLock.Unlock() + if ack.Error == electron.Timeout { + conn.logger.Metadata(logging.Metadata{ + "timeout": conn.SendTimeout, + "id": ack.Value, + }) + conn.logger.Error("Was not able to deliver message on time.") + } else { + conn.logger.Metadata(logging.Metadata{ + "id": ack.Value, + "err": ack.Error, }) - conn.logger.Warn("Sent message was not ACKed") + conn.logger.Error("Error delivering message.") } - case <-timer.C: - conn.logger.Metadata(map[string]interface{}{ + } else if ack.Status != 2 { + conn.logger.Metadata(logging.Metadata{ + "id": ack.Value, + "ack": ack.Status, + }) + conn.logger.Warn("Sent message was not ACKed.") + } else { + lfLock.Lock() + linkFail = 0 + lfLock.Unlock() + conn.logger.Metadata(logging.Metadata{ + "id": ack.Value, + }) + conn.logger.Debug("Sent message ACKed.") + } + case <-conn.interrupt: + goto doneAck + } + } + doneAck: + conn.logger.Debug("Shutting down ACK check coroutine.") + }(ackchan) + + //create sending goroutine + wg.Add(1) + go func(inchan chan interface{}) { + + defer wg.Done() + + timeout := time.Duration(conn.SendTimeout) * time.Second + counter := int64(0) + for { + lfLock.RLock() + failure := linkFail > conn.LinkFailureLimit + lfLock.RUnlock() + + if failure { + conn.logger.Warn("Too many link failures in row, reconnecting") + err := conn.Reconnect("out", outchan, &wg) + if err != nil { + conn.logger.Metadata(logging.Metadata{ + "error": err, + }) + conn.logger.Error("Unable to send data, shutting down sending loop") + goto doneSend + } + } + + select { + case msg := <-inchan: + switch message := msg.(type) { + case AMQP10Message: + var sender *electron.Sender + if s, ok := conn.senders[message.Address]; ok { + sender = &s + } else { + if s, err := conn.CreateSender(message.Address); err != nil { + conn.logger.Metadata(logging.Metadata{ + "reason": err, + }) + conn.logger.Warn("Skipping processing message") + (*s).Close(nil) + + lfLock.Lock() + linkFail += 1 + lfLock.Unlock() + continue + } else { + lfLock.Lock() + linkFail = 0 + lfLock.Unlock() + sender = s + } + } + + counter += int64(1) + m := amqp.NewMessageWith(message.Body) + m.SetMessageId(counter) + m.SetContentType("application/json") + + conn.logger.Metadata(logging.Metadata{ + "address": message.Address, "message": m, }) - conn.logger.Warn("Sent message timed out on ACK. Delivery not guaranteed.") + conn.logger.Debug("Sending AMQP1.0 message") + (*sender).SendAsyncTimeout(m, ackchan, m.MessageId(), timeout) + default: + conn.logger.Metadata(map[string]interface{}{ + "message": msg, + }) + conn.logger.Debug("Skipped processing of sent AMQP1.0 message with invalid type") } - default: - conn.logger.Metadata(map[string]interface{}{ - "message": msg, - }) - conn.logger.Debug("Skipped processing of sent AMQP1.0 message with invalid type") + case <-conn.interrupt: + goto doneSend } } - }() + doneSend: + conn.logger.Debug("Shutting down sending coroutine.") + for s := range conn.senders { + conn.senders[s].Close(nil) + } + }(inchan) + + return &wg } diff --git a/system/signals.go b/system/signals.go index 2cdbed6..bbb9afe 100644 --- a/system/signals.go +++ b/system/signals.go @@ -8,7 +8,7 @@ import ( ) //SpawnSignalHandler spawns goroutine which will wait for given interruption signal(s) -// and in case any is receiverend closes given channel to signal that program should be closed +// and in case any is received closes given channel to signal that program should be closed func SpawnSignalHandler(finish chan bool, logger *logging.Logger, watchedSignals ...os.Signal) { interruptChannel := make(chan os.Signal, 1) signal.Notify(interruptChannel, watchedSignals...) diff --git a/tests/connector_test.go b/tests/connector_test.go index 5ac6184..90da676 100644 --- a/tests/connector_test.go +++ b/tests/connector_test.go @@ -8,6 +8,7 @@ import ( "os" "path" "strconv" + "sync" "testing" "time" @@ -22,18 +23,19 @@ import ( ) const ( - QDRMsg = "{\"labels\":{\"check\":\"test\",\"client\":\"fedora\",\"severity\":\"OKAY\"},\"annotations\":{\"command\":\"echo 'wubba lubba dub dub'\",\"duration\":0.002853846,\"executed\":1675108402," + - "\"issued\":1675108402,\"output\":\"wubba lubba dub dub\\n\",\"status\":0,\"ves\":\"{\\\"commonEventHeader\\\":{\\\"domain\\\":\\\"heartbeat\\\",\\\"eventType\\\":\\\"checkResult\\\"," + + QDRMsg1 = "{\"labels\":{\"check\":\"test\",\"client\":\"fedora\",\"severity\":\"OKAY\"},\"annotations\":{\"command\":\"echo 'Test transfer'\",\"duration\":0.002853846,\"executed\":1675108402," + + "\"issued\":1675108402,\"output\":\"Test transfer\\n\",\"status\":0,\"ves\":\"{\\\"commonEventHeader\\\":{\\\"domain\\\":\\\"heartbeat\\\",\\\"eventType\\\":\\\"checkResult\\\"," + "\\\"eventId\\\":\\\"fedora-test\\\",\\\"priority\\\":\\\"Normal\\\",\\\"reportingEntityId\\\":\\\"c1d13353-82aa-4370-bc53-db0d60d79c12\\\",\\\"reportingEntityName\\\":\\\"fedora\\\"," + "\\\"sourceId\\\":\\\"c1d13353-82aa-4370-bc53-db0d60d79c12\\\",\\\"sourceName\\\":\\\"fedora-collectd-sensubility\\\",\\\"startingEpochMicrosec\\\":1675108402,\\\"lastEpochMicrosec\\\":1675108402}," + - "\\\"heartbeatFields\\\":{\\\"additionalFields\\\":{\\\"check\\\":\\\"test\\\",\\\"command\\\":\\\"echo 'wubba lubba dub dub'\\\",\\\"duration\\\":\\\"0.002854\\\",\\\"executed\":\\\"1675108402\"," + - "\\\"issued\\\":\\\"1675108402\\\",\\\"output\\\":\"wubba lubba dub dub\\n\\\",\\\"status\\\":\\\"0\\\"}}}\"},\"startsAt\":\"2023-01-30T20:53:22+01:00\"}}" + "\\\"heartbeatFields\\\":{\\\"additionalFields\\\":{\\\"check\\\":\\\"test\\\",\\\"command\\\":\\\"echo 'Test transfer'\\\",\\\"duration\\\":\\\"0.002854\\\",\\\"executed\":\\\"1675108402\"," + + "\\\"issued\\\":\\\"1675108402\\\",\\\"output\\\":\"Test transfer\\n\\\",\\\"status\\\":\\\"0\\\"}}}\"},\"startsAt\":\"2023-01-30T20:53:22+01:00\"}}" + QDRMsg2 = "{\"message\": \"smart gateway reconnect test\"}" ConfigContent = `{ "LogLevel": "Debug", "Amqp1": { "Connection": { "Address": "amqp://127.0.0.1:5666", - "SendTimeout": 2 + "SendTimeout": 2 }, "Client": { "Name": "connectortest" @@ -109,7 +111,7 @@ func TestUnixSocketSendAndReceiveMessage(t *testing.T) { defer logger.Destroy() metadata := map[string][]config.Parameter{ - "Socket": []config.Parameter{}, + "Socket": {}, } cfg := config.NewJSONConfig(metadata, logger) cfg.AddStructured("Socket", "In", ``, MockedSocket{}) @@ -188,8 +190,8 @@ func TestAMQP10SendAndReceiveMessage(t *testing.T) { defer logger.Destroy() metadata := map[string][]config.Parameter{ - "Amqp1": []config.Parameter{ - config.Parameter{Name: "LogFile", Tag: ``, Default: logpath, Validators: []config.Validator{}}, + "Amqp1": { + {Name: "LogFile", Tag: ``, Default: logpath, Validators: []config.Validator{}}, }, } cfg := config.NewJSONConfig(metadata, logger) @@ -201,7 +203,7 @@ func TestAMQP10SendAndReceiveMessage(t *testing.T) { t.Fatalf("Failed to parse config file: %s", err) } - conn, err := amqp10.ConnectAMQP10(cfg, logger) + conn, err := amqp10.ConnectAMQP10("ci", cfg, logger) if err != nil { t.Fatalf("Failed to connect to QDR: %s", err) } @@ -213,21 +215,59 @@ func TestAMQP10SendAndReceiveMessage(t *testing.T) { receiver := make(chan interface{}) sender := make(chan interface{}) - conn.Start(receiver, sender) + cwg := conn.Start(receiver, sender) - t.Run("Test receive", func(t *testing.T) { - t.Parallel() - for i := 0; i < 3; i++ { - data := <-receiver - assert.Equal(t, QDRMsg, (data.(amqp10.AMQP10Message)).Body) - } + t.Run("Test transport", func(t *testing.T) { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 3; i++ { + data := <-receiver + assert.Equal(t, QDRMsg1, (data.(amqp10.AMQP10Message)).Body) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg1} + sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg1} + sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg1} + }() + + wg.Wait() }) - t.Run("Test send and ACK", func(t *testing.T) { - t.Parallel() - sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg} - sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg} - sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg} + + t.Run("Test reconnect", func(t *testing.T) { + var wg sync.WaitGroup + + require.NoError(t, conn.Reconnect("in", receiver, cwg)) + require.NoError(t, conn.Reconnect("out", receiver, cwg)) + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 3; i++ { + data := <-receiver + assert.Equal(t, QDRMsg2, (data.(amqp10.AMQP10Message)).Body) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg2} + sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg2} + sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg2} + }() + + wg.Wait() + conn.Disconnect() + cwg.Wait() }) + } func TestLoki(t *testing.T) { @@ -244,8 +284,8 @@ func TestLoki(t *testing.T) { defer logger.Destroy() metadata := map[string][]config.Parameter{ - "Loki": []config.Parameter{ - config.Parameter{Name: "LogFile", Tag: ``, Default: logpath, Validators: []config.Validator{}}, + "Loki": { + {Name: "LogFile", Tag: ``, Default: logpath, Validators: []config.Validator{}}, }, } cfg := config.NewJSONConfig(metadata, logger) diff --git a/tests/iniconfig_test.go b/tests/iniconfig_test.go index 90303ca..15c451a 100644 --- a/tests/iniconfig_test.go +++ b/tests/iniconfig_test.go @@ -61,16 +61,16 @@ func TestINIConfigValues(t *testing.T) { defer log.Destroy() metadata := map[string][]config.Parameter{ - "default": []config.Parameter{ - config.Parameter{Name: "log_file", Tag: "", Default: "/var/log/collectd-sensubility.log", Validators: []config.Validator{}}, - config.Parameter{Name: "log_level", Tag: "", Default: "INFO", Validators: []config.Validator{config.StringOptionsValidatorFactory([]string{"DEBUG", "INFO", "WARNING", "ERROR"})}}, - config.Parameter{Name: "allow_exec", Tag: "", Default: true, Validators: []config.Validator{config.BoolValidatorFactory()}}, + "default": { + {Name: "log_file", Tag: "", Default: "/var/log/collectd-sensubility.log", Validators: []config.Validator{}}, + {Name: "log_level", Tag: "", Default: "INFO", Validators: []config.Validator{config.StringOptionsValidatorFactory([]string{"DEBUG", "INFO", "WARNING", "ERROR"})}}, + {Name: "allow_exec", Tag: "", Default: true, Validators: []config.Validator{config.BoolValidatorFactory()}}, }, - "amqp1": []config.Parameter{ - config.Parameter{Name: "host", Tag: "", Default: "localhost", Validators: []config.Validator{}}, - config.Parameter{Name: "port", Tag: "", Default: 5666, Validators: []config.Validator{config.IntValidatorFactory()}}, - config.Parameter{Name: "user", Tag: "", Default: "guest", Validators: []config.Validator{}}, - config.Parameter{Name: "password", Tag: "", Default: "guest", Validators: []config.Validator{}}, + "amqp1": { + {Name: "host", Tag: "", Default: "localhost", Validators: []config.Validator{}}, + {Name: "port", Tag: "", Default: 5666, Validators: []config.Validator{config.IntValidatorFactory()}}, + {Name: "user", Tag: "", Default: "guest", Validators: []config.Validator{}}, + {Name: "password", Tag: "", Default: "guest", Validators: []config.Validator{}}, }, } conf := config.NewINIConfig(metadata, log) @@ -120,15 +120,15 @@ func TestValidators(t *testing.T) { t.Run("Test parsed values from INI configuration file", func(t *testing.T) { tests := []ValidatorTest{ - ValidatorTest{"IntValidator", config.IntValidatorFactory(), "3"}, - ValidatorTest{"MultiIntValidator", config.MultiIntValidatorFactory(","), "1,2"}, - ValidatorTest{"BoolValidator", config.BoolValidatorFactory(), "true"}, - ValidatorTest{"OptionsValidator", config.StringOptionsValidatorFactory([]string{"bar", "baz"}), "bar"}, + {"IntValidator", config.IntValidatorFactory(), "3"}, + {"MultiIntValidator", config.MultiIntValidatorFactory(","), "1,2"}, + {"BoolValidator", config.BoolValidatorFactory(), "true"}, + {"OptionsValidator", config.StringOptionsValidatorFactory([]string{"bar", "baz"}), "bar"}, } for _, test := range tests { metadata := map[string][]config.Parameter{ - "invalid": []config.Parameter{ - config.Parameter{Name: test.Parameter, Tag: "", Default: test.defValue, Validators: []config.Validator{test.Validator}}, + "invalid": { + {Name: test.Parameter, Tag: "", Default: test.defValue, Validators: []config.Validator{test.Validator}}, }, } conf := config.NewINIConfig(metadata, log) @@ -141,8 +141,8 @@ func TestValidators(t *testing.T) { t.Run("Test of raising validation errors", func(t *testing.T) { metadata := map[string][]config.Parameter{ - "invalid": []config.Parameter{ - config.Parameter{Name: "default_test", Tag: "", Default: "default", Validators: []config.Validator{config.IntValidatorFactory()}}, + "invalid": { + {Name: "default_test", Tag: "", Default: "default", Validators: []config.Validator{config.IntValidatorFactory()}}, }, } conf := config.NewINIConfig(metadata, log) @@ -153,8 +153,8 @@ func TestValidators(t *testing.T) { t.Run("Test of fetching option dynamically", func(t *testing.T) { metadata := map[string][]config.Parameter{ - "default": []config.Parameter{ - config.Parameter{Name: "log_file", Tag: "", Default: "/var/log/collectd-sensubility.log", Validators: []config.Validator{}}, + "default": { + {Name: "log_file", Tag: "", Default: "/var/log/collectd-sensubility.log", Validators: []config.Validator{}}, }, } conf := config.NewINIConfig(metadata, log) diff --git a/tests/jsonconfig_test.go b/tests/jsonconfig_test.go index 325be16..51aad19 100644 --- a/tests/jsonconfig_test.go +++ b/tests/jsonconfig_test.go @@ -48,15 +48,15 @@ var ( } ` JSONConfigMetadata = map[string][]config.Parameter{ - "Default": []config.Parameter{ - config.Parameter{Name: "LogFile", Tag: `json:"log_file"`, Default: "/var/log/the.log", Validators: []config.Validator{}}, - config.Parameter{Name: "NoTag", Tag: "", Default: "notag", Validators: []config.Validator{}}, - config.Parameter{Name: "LogLevel", Tag: `json:"log_level"`, Default: "INFO", Validators: []config.Validator{config.StringOptionsValidatorFactory([]string{"DEBUG", "INFO", "WARNING", "ERROR"})}}, - config.Parameter{Name: "AllowExec", Tag: `json:"allow_exec"`, Default: true, Validators: []config.Validator{config.BoolValidatorFactory()}}, - config.Parameter{Name: "Port", Tag: `json:"port"`, Default: 5666, Validators: []config.Validator{config.IntValidatorFactory()}}, + "Default": { + {Name: "LogFile", Tag: `json:"log_file"`, Default: "/var/log/the.log", Validators: []config.Validator{}}, + {Name: "NoTag", Tag: "", Default: "notag", Validators: []config.Validator{}}, + {Name: "LogLevel", Tag: `json:"log_level"`, Default: "INFO", Validators: []config.Validator{config.StringOptionsValidatorFactory([]string{"DEBUG", "INFO", "WARNING", "ERROR"})}}, + {Name: "AllowExec", Tag: `json:"allow_exec"`, Default: true, Validators: []config.Validator{config.BoolValidatorFactory()}}, + {Name: "Port", Tag: `json:"port"`, Default: 5666, Validators: []config.Validator{config.IntValidatorFactory()}}, }, - "Amqp1": []config.Parameter{ - config.Parameter{Name: "Float", Tag: `json:"float"`, Default: 6.6, Validators: []config.Validator{}}, + "Amqp1": { + {Name: "Float", Tag: `json:"float"`, Default: 6.6, Validators: []config.Validator{}}, }, } ) @@ -122,7 +122,7 @@ func TestJSONConfigValues(t *testing.T) { connObj := conf.Sections["Amqp1"].Options["Connections"].GetStructured() connTypedObj := connObj.(OuterTestObject) assert.Equal(t, "woobalooba", connTypedObj.Test, "Did not parse correctly") - parsedConnections := []InnerTestObject{InnerTestObject{"test1", "booyaka"}, InnerTestObject{"test2", "foobar"}} + parsedConnections := []InnerTestObject{{"test1", "booyaka"}, {"test2", "foobar"}} assert.Equal(t, parsedConnections, connTypedObj.Connections, "Did not parse correctly") }) @@ -136,8 +136,8 @@ func TestJSONConfigValues(t *testing.T) { } cases := []DynamicFetchTest{ - DynamicFetchTest{"Amqp1.Connections.Test", "woobalooba"}, - DynamicFetchTest{"Default.LogFile", "/var/log/another.log"}, + {"Amqp1.Connections.Test", "woobalooba"}, + {"Default.LogFile", "/var/log/another.log"}, } for _, test := range cases { if opt, err := conf.GetOption(test.AddrStr); err != nil {