From a34c55d46496f9dbafc485f24c2a891a00856bf0 Mon Sep 17 00:00:00 2001 From: Martin Magr Date: Mon, 22 Feb 2021 14:53:03 +0100 Subject: [PATCH 1/3] Split AMQP10Connector --- connector/amqp10.go | 147 ++++++++++++++++++++++++++------------------ 1 file changed, 86 insertions(+), 61 deletions(-) diff --git a/connector/amqp10.go b/connector/amqp10.go index 1a481b4..326310f 100644 --- a/connector/amqp10.go +++ b/connector/amqp10.go @@ -12,13 +12,19 @@ import ( "github.com/apache/qpid-proton/go/pkg/electron" ) +const ( + defaultSendTimeout = 2 + defaultListenPrefetch = -1 + defaultClientName = "localhost" +) + //AMQP10Receiver is tagged electron receiver type AMQP10Receiver struct { Receiver electron.Receiver Tags []string } -//LokiConnector is the object to be used for communication with AMQP-1.0 entity +//AMQP10Connector is the object to be used for communication with AMQP-1.0 entity type AMQP10Connector struct { Address string ClientName string @@ -36,96 +42,115 @@ type AMQP10Message struct { Tags []string } +//CreateAMQP10Connector creates the connector and connects to given AMQP1.0 service +func CreateAMQP10Connector(logger *logging.Logger, address string, clientName string, sendTimeout int64, listenPrefetch int64, listenChannels []string) (*AMQP10Connector, error) { + connector := AMQP10Connector{ + Address: address, + ClientName: clientName, + SendTimeout: sendTimeout, + logger: logger, + receivers: make([]AMQP10Receiver, 0), + } + + // connect + if err := connector.Connect(); err != nil { + return &connector, fmt.Errorf("Error while connecting to AMQP") + } + // bind to channels + for _, channel := range listenChannels { + if len(channel) < 1 { + continue + } + logger.Metadata(map[string]interface{}{ + "channel": channel, + "prefetch": listenPrefetch, + }) + logger.Debug("Creating AMQP receiver for channel") + if err := connector.CreateReceiver(channel, int(listenPrefetch)); err != nil { + return &connector, fmt.Errorf("Failed to create receiver: %s", err) + } + } + return &connector, nil +} + //ConnectAMQP10 creates new AMQP1.0 connector from the given configuration file func ConnectAMQP10(cfg config.Config, logger *logging.Logger) (*AMQP10Connector, error) { - connector := AMQP10Connector{} - connector.receivers = make([]AMQP10Receiver, 0) - connector.logger = logger - var err error - // pre-connect initialization - var addr *config.Option + var opt *config.Option + switch conf := cfg.(type) { case *config.INIConfig: - addr, err = conf.GetOption("amqp1/connection") + opt, err = conf.GetOption("amqp1/connection") case *config.JSONConfig: - addr, err = conf.GetOption("Amqp1.Connection.Address") + opt, err = conf.GetOption("Amqp1.Connection.Address") default: - return &connector, fmt.Errorf("Unknown Config type") + return nil, fmt.Errorf("Unknown Config type") } - if err == nil && addr != nil { - connector.Address = addr.GetString() - } else { - return &connector, fmt.Errorf("Failed to get connection URL from configuration file: %s", err) + if err != nil { + return nil, err + } + if opt == nil { + return nil, fmt.Errorf("Failed to get connection URL from configuration file") } + addr := opt.GetString() - var sendTimeout *config.Option switch conf := cfg.(type) { case *config.INIConfig: - sendTimeout, err = conf.GetOption("amqp1/send_timeout") + opt, err = conf.GetOption("amqp1/send_timeout") case *config.JSONConfig: - sendTimeout, err = conf.GetOption("Amqp1.Connection.SendTimeout") + opt, err = conf.GetOption("Amqp1.Connection.SendTimeout") } - if err == nil && sendTimeout != nil { - connector.SendTimeout = sendTimeout.GetInt() - } else { - return &connector, fmt.Errorf("Failed to get send timeout from configuration file: %s", err) + if err != nil { + return nil, err + } + sendTimeout := int64(defaultSendTimeout) + if opt != nil { + sendTimeout = opt.GetInt() } - var clientName *config.Option switch conf := cfg.(type) { case *config.INIConfig: - clientName, err = conf.GetOption("amqp1/client_name") + opt, err = conf.GetOption("amqp1/client_name") case *config.JSONConfig: - clientName, err = conf.GetOption("Amqp1.Client.Name") + opt, err = conf.GetOption("Amqp1.Client.Name") } - if err == nil && clientName != nil { - connector.ClientName = clientName.GetString() - } else { - return &connector, fmt.Errorf("Failed to get client name from configuration file: %s", err) + if err != nil { + return nil, err + } + clientName := defaultClientName + if opt != nil { + clientName = opt.GetString() } - // connect - if err := connector.Connect(); err != nil { - return &connector, fmt.Errorf("Error while connecting to AMQP") + switch conf := cfg.(type) { + case *config.INIConfig: + opt, err = conf.GetOption("amqp1/listen_channels") + case *config.JSONConfig: + opt, err = conf.GetOption("Amqp1.Connection.ListenChannels") + } + if err != nil { + return nil, err + } + listen := []string{} + if opt != nil { + listen = opt.GetStrings(",") } - // post-connect initialization - var listen *config.Option switch conf := cfg.(type) { case *config.INIConfig: - listen, err = conf.GetOption("amqp1/listen_channels") + opt, err = conf.GetOption("amqp1/listen_prefetch") case *config.JSONConfig: - listen, err = conf.GetOption("Amqp1.Connection.ListenChannels") + opt, err = conf.GetOption("Amqp1.Connection.ListenPrefetch") } - if err == nil && listen != nil { - var prf *config.Option - switch conf := cfg.(type) { - case *config.INIConfig: - prf, err = conf.GetOption("amqp1/listen_prefetch") - case *config.JSONConfig: - prf, err = conf.GetOption("Amqp1.Connection.ListenPrefetch") - } - prefetch := int64(-1) - if err == nil && prf != nil { - prefetch = prf.GetInt() - } - for _, channel := range listen.GetStrings(",") { - if len(channel) < 1 { - continue - } - logger.Metadata(map[string]interface{}{ - "channel": channel, - "prefetch": prefetch, - }) - logger.Debug("Creating AMQP receiver for channel") - if err := connector.CreateReceiver(channel, int(prefetch)); err != nil { - return &connector, fmt.Errorf("Failed to create receiver: %s", err) - } - } + if err != nil { + return nil, err + } + prf := int64(defaultListenPrefetch) + if opt != nil { + prf = opt.GetInt() } - return &connector, nil + return CreateAMQP10Connector(logger, addr, clientName, sendTimeout, prf, listen) } //Connect creates input and output connection to configured AMQP1.0 node From 575da686e8e3cb4d6fb18fd7398917d79563f421 Mon Sep 17 00:00:00 2001 From: Martin Magr Date: Tue, 23 Feb 2021 13:38:53 +0100 Subject: [PATCH 2/3] Split SensuConnector --- connector/sensu.go | 111 +++++++++++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 44 deletions(-) diff --git a/connector/sensu.go b/connector/sensu.go index 3b1a585..d040b80 100644 --- a/connector/sensu.go +++ b/connector/sensu.go @@ -15,7 +15,9 @@ const ( //QueueNameKeepAlives is the name of queue used by Sensu server for receiving keepalive messages QueueNameKeepAlives = "keepalives" //QueueNameResults is the name of queue used by Sensu server for receiving check result messages - QueueNameResults = "results" + QueueNameResults = "results" + defaultClientAddress = "127.0.0.1" + defaultInterval = 30 ) //Result contains data about check execution @@ -69,83 +71,104 @@ type SensuConnector struct { consumer <-chan amqp.Delivery } +//CreateSensuConnector creates the connector and connects on given RabbitMQ service with Sensu server on appropriate channels +func CreateSensuConnector(logger *logging.Logger, address string, clientName string, clientAddress string, keepaliveInterval int64, subscriptions []string) (*SensuConnector, error) { + connector := SensuConnector{ + Address: address, + Subscription: subscriptions, + ClientName: clientName, + exchangeName: fmt.Sprintf("client:%s", clientName), + queueName: fmt.Sprintf("%s-infrawatch-%d", clientName, time.Now().Unix()), + ClientAddress: clientAddress, + KeepaliveInterval: keepaliveInterval, + logger: logger, + } + + if err := connector.Connect(); err != nil { + return &connector, fmt.Errorf("Error while connecting to RabbitMQ") + } + + return &connector, nil +} + //ConnectSensu creates new Sensu connector from the given configuration file func ConnectSensu(cfg config.Config, logger *logging.Logger) (*SensuConnector, error) { - connector := SensuConnector{} - connector.logger = logger - var err error - var addr *config.Option + var opt *config.Option + switch conf := cfg.(type) { case *config.INIConfig: - addr, err = conf.GetOption("sensu/connection") + opt, err = conf.GetOption("sensu/connection") case *config.JSONConfig: - addr, err = conf.GetOption("Sensu.Connection.Address") + opt, err = conf.GetOption("Sensu.Connection.Address") default: - return &connector, fmt.Errorf("Unknown Config type") + return nil, fmt.Errorf("Unknown Config type") + } + if err != nil { + return nil, err } - if err == nil && addr != nil { - connector.Address = addr.GetString() - } else { - return &connector, fmt.Errorf("Failed to get connection URL from configuration file") + if opt == nil { + return nil, fmt.Errorf("Failed to get connection URL from configuration file") } + addr := opt.GetString() - var subs *config.Option switch conf := cfg.(type) { case *config.INIConfig: - subs, err = conf.GetOption("sensu/subscriptions") + opt, err = conf.GetOption("sensu/subscriptions") case *config.JSONConfig: - subs, err = conf.GetOption("Sensu.Connection.Subscriptions") + opt, err = conf.GetOption("Sensu.Connection.Subscriptions") + } + if err != nil { + return nil, err } - if err == nil && subs != nil { - connector.Subscription = subs.GetStrings(",") - } else { - return &connector, fmt.Errorf("Failed to get subscription channels from configuration file") + subs := []string{"all"} + if opt != nil { + subs = opt.GetStrings(",") } - var clientName *config.Option switch conf := cfg.(type) { case *config.INIConfig: - clientName, err = conf.GetOption("sensu/client_name") + opt, err = conf.GetOption("sensu/client_name") case *config.JSONConfig: - clientName, err = conf.GetOption("Sensu.Client.Name") + opt, err = conf.GetOption("Sensu.Client.Name") + } + if err != nil { + return nil, err } - if err == nil && clientName != nil { - connector.ClientName = clientName.GetString() - connector.exchangeName = fmt.Sprintf("client:%s", clientName) - connector.queueName = fmt.Sprintf("%s-infrawatch-%d", clientName, time.Now().Unix()) - } else { - return &connector, fmt.Errorf("Failed to get client name from configuration file") + clientName := defaultClientName + if opt != nil { + clientName = opt.GetString() } - var clientAddr *config.Option switch conf := cfg.(type) { case *config.INIConfig: - clientAddr, err = conf.GetOption("sensu/client_address") + opt, err = conf.GetOption("sensu/client_address") case *config.JSONConfig: - clientAddr, err = conf.GetOption("Sensu.Client.Address") + opt, err = conf.GetOption("Sensu.Client.Address") } - if err == nil && clientAddr != nil { - connector.ClientAddress = clientAddr.GetString() - } else { - return &connector, fmt.Errorf("Failed to get client address from configuration file") + if err != nil { + return nil, err + } + clientAddr := defaultClientAddress + if opt != nil { + clientAddr = opt.GetString() } - var interval *config.Option switch conf := cfg.(type) { case *config.INIConfig: - interval, err = conf.GetOption("sensu/keepalive_interval") + opt, err = conf.GetOption("sensu/keepalive_interval") case *config.JSONConfig: - interval, err = conf.GetOption("Sensu.Connection.KeepaliveInterval") + opt, err = conf.GetOption("Sensu.Connection.KeepaliveInterval") + } + if err != nil { + return nil, err } - if err == nil && interval != nil { - connector.KeepaliveInterval = interval.GetInt() - } else { - return &connector, fmt.Errorf("Failed to get keepalive interval from configuration file") + interval := int64(defaultInterval) + if opt != nil { + interval = opt.GetInt() } - err = connector.Connect() - return &connector, err + return CreateSensuConnector(logger, addr, clientName, clientAddr, interval, subs) } //Connect connects to RabbitMQ server and From a728e01f27adcd843787a3754b81da61fb9eb505 Mon Sep 17 00:00:00 2001 From: Martin Magr Date: Tue, 23 Feb 2021 13:57:03 +0100 Subject: [PATCH 3/3] Split UnixSocketConnector --- connector/unix-socket.go | 90 ++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/connector/unix-socket.go b/connector/unix-socket.go index 878f611..18e849c 100644 --- a/connector/unix-socket.go +++ b/connector/unix-socket.go @@ -12,8 +12,8 @@ import ( const maxBufferSize = 4096 type socketInfo struct { - Address net.UnixAddr - Pc *net.UnixConn + Address net.UnixAddr + Pc *net.UnixConn } type UnixSocketConnector struct { @@ -27,18 +27,50 @@ type UnixSocketConnector struct { // and expects the file to already be created for the outgoing // socket. Both or only one of the directions can be configured. -func ConnectUnixSocket(cfg config.Config, logger *logging.Logger) (*UnixSocketConnector, error) { - connector := UnixSocketConnector { +//CreateUnixSocketConnector ... +func CreateUnixSocketConnector(logger *logging.Logger, inAddress string, outAddress string, maxBufferSize uint64) (*UnixSocketConnector, error) { + connector := UnixSocketConnector{ msgBuffer: make([]byte, maxBufferSize), logger: logger, out: &socketInfo{}, in: &socketInfo{}, } + if inAddress != "" { + connector.in.Address.Name = inAddress + connector.in.Address.Net = "unixgram" + connector.logger.Metadata(map[string]interface{}{ + "address": connector.in.Address.Name, + }) + connector.logger.Debug("In socket configured") + } else { + connector.in = nil + connector.logger.Debug("The in socket isn't configured") + } + + if outAddress != "" { + connector.out.Address.Name = outAddress + connector.out.Address.Net = "unixgram" + + connector.logger.Metadata(map[string]interface{}{ + "address": connector.out.Address.Name, + }) + connector.logger.Debug("Out socket configured") + } else { + connector.out = nil + connector.logger.Debug("The out socket isn't configured") + } + + err := connector.Connect() + return &connector, err +} + +//ConnectUnixSocket ... +func ConnectUnixSocket(cfg config.Config, logger *logging.Logger) (*UnixSocketConnector, error) { var err error - var address string + var inAddress, outAddress string - var inAddr *config.Option + var inAddr *config.Option switch conf := cfg.(type) { case *config.INIConfig: inAddr, err = conf.GetOption("socket/in_address") @@ -47,22 +79,9 @@ func ConnectUnixSocket(cfg config.Config, logger *logging.Logger) (*UnixSocketCo } if err == nil && inAddr != nil { - address = inAddr.GetString() + inAddress = inAddr.GetString() } else { - address = "" - } - if address != "" { - connector.in.Address.Name = address - connector.in.Address.Net = "unixgram" - - connector.logger.Metadata(map[string]interface{}{ - "address": connector.in.Address.Name, - }) - connector.logger.Debug("In socket configured") - } else { - connector.in = nil - - connector.logger.Debug("The in socket isn't configured") + inAddress = "" } var outAddr *config.Option @@ -74,33 +93,16 @@ func ConnectUnixSocket(cfg config.Config, logger *logging.Logger) (*UnixSocketCo } if err == nil && inAddr != nil { - address = outAddr.GetString() - } else { - address = "" - } - if address != "" { - connector.out.Address.Name = address - connector.out.Address.Net = "unixgram" - - connector.logger.Metadata(map[string]interface{}{ - "address": connector.out.Address.Name, - }) - connector.logger.Debug("Out socket configured") + outAddress = outAddr.GetString() } else { - connector.out = nil - - connector.logger.Debug("The out socket isn't configured") + outAddress = "" } - if connector.in == nil && connector.out == nil { + if outAddress == "" && inAddress == "" { return nil, fmt.Errorf("No socket was configured") } - err = connector.Connect() - if err != nil { - return nil, err - } - return &connector, nil + return CreateUnixSocketConnector(logger, inAddress, outAddress, maxBufferSize) } func (connector *UnixSocketConnector) connectSingleSocket(info *socketInfo, isIn bool) error { @@ -149,7 +151,7 @@ func (connector *UnixSocketConnector) Start(outchan chan interface{}, inchan cha n, err := connector.in.Pc.Read(connector.msgBuffer[:]) if err != nil || n < 1 { connector.logger.Metadata(map[string]interface{}{ - "error": err, + "error": err, "characters read": n, }) connector.logger.Debug("Error while trying to read from unix socket.") @@ -174,7 +176,7 @@ func (connector *UnixSocketConnector) Start(outchan chan interface{}, inchan cha n, err := connector.out.Pc.Write([]byte(message)) if err != nil || n < 1 { connector.logger.Metadata(map[string]interface{}{ - "error": err, + "error": err, "characters written": n, }) connector.logger.Debug("Error while trying to write to unix socket.")