diff --git a/connector/amqp10.go b/connector/amqp10.go index 1a481b4..326310f 100644 --- a/connector/amqp10.go +++ b/connector/amqp10.go @@ -12,13 +12,19 @@ import ( "github.com/apache/qpid-proton/go/pkg/electron" ) +const ( + defaultSendTimeout = 2 + defaultListenPrefetch = -1 + defaultClientName = "localhost" +) + //AMQP10Receiver is tagged electron receiver type AMQP10Receiver struct { Receiver electron.Receiver Tags []string } -//LokiConnector is the object to be used for communication with AMQP-1.0 entity +//AMQP10Connector is the object to be used for communication with AMQP-1.0 entity type AMQP10Connector struct { Address string ClientName string @@ -36,96 +42,115 @@ type AMQP10Message struct { Tags []string } +//CreateAMQP10Connector creates the connector and connects to given AMQP1.0 service +func CreateAMQP10Connector(logger *logging.Logger, address string, clientName string, sendTimeout int64, listenPrefetch int64, listenChannels []string) (*AMQP10Connector, error) { + connector := AMQP10Connector{ + Address: address, + ClientName: clientName, + SendTimeout: sendTimeout, + logger: logger, + receivers: make([]AMQP10Receiver, 0), + } + + // connect + if err := connector.Connect(); err != nil { + return &connector, fmt.Errorf("Error while connecting to AMQP") + } + // bind to channels + for _, channel := range listenChannels { + if len(channel) < 1 { + continue + } + logger.Metadata(map[string]interface{}{ + "channel": channel, + "prefetch": listenPrefetch, + }) + logger.Debug("Creating AMQP receiver for channel") + if err := connector.CreateReceiver(channel, int(listenPrefetch)); err != nil { + return &connector, fmt.Errorf("Failed to create receiver: %s", err) + } + } + return &connector, nil +} + //ConnectAMQP10 creates new AMQP1.0 connector from the given configuration file func ConnectAMQP10(cfg config.Config, logger *logging.Logger) (*AMQP10Connector, error) { - connector := AMQP10Connector{} - connector.receivers = make([]AMQP10Receiver, 0) - connector.logger = logger - var err error - // pre-connect initialization - var addr *config.Option + var opt *config.Option + switch conf := cfg.(type) { case *config.INIConfig: - addr, err = conf.GetOption("amqp1/connection") + opt, err = conf.GetOption("amqp1/connection") case *config.JSONConfig: - addr, err = conf.GetOption("Amqp1.Connection.Address") + opt, err = conf.GetOption("Amqp1.Connection.Address") default: - return &connector, fmt.Errorf("Unknown Config type") + return nil, fmt.Errorf("Unknown Config type") } - if err == nil && addr != nil { - connector.Address = addr.GetString() - } else { - return &connector, fmt.Errorf("Failed to get connection URL from configuration file: %s", err) + if err != nil { + return nil, err + } + if opt == nil { + return nil, fmt.Errorf("Failed to get connection URL from configuration file") } + addr := opt.GetString() - var sendTimeout *config.Option switch conf := cfg.(type) { case *config.INIConfig: - sendTimeout, err = conf.GetOption("amqp1/send_timeout") + opt, err = conf.GetOption("amqp1/send_timeout") case *config.JSONConfig: - sendTimeout, err = conf.GetOption("Amqp1.Connection.SendTimeout") + opt, err = conf.GetOption("Amqp1.Connection.SendTimeout") } - if err == nil && sendTimeout != nil { - connector.SendTimeout = sendTimeout.GetInt() - } else { - return &connector, fmt.Errorf("Failed to get send timeout from configuration file: %s", err) + if err != nil { + return nil, err + } + sendTimeout := int64(defaultSendTimeout) + if opt != nil { + sendTimeout = opt.GetInt() } - var clientName *config.Option switch conf := cfg.(type) { case *config.INIConfig: - clientName, err = conf.GetOption("amqp1/client_name") + opt, err = conf.GetOption("amqp1/client_name") case *config.JSONConfig: - clientName, err = conf.GetOption("Amqp1.Client.Name") + opt, err = conf.GetOption("Amqp1.Client.Name") } - if err == nil && clientName != nil { - connector.ClientName = clientName.GetString() - } else { - return &connector, fmt.Errorf("Failed to get client name from configuration file: %s", err) + if err != nil { + return nil, err + } + clientName := defaultClientName + if opt != nil { + clientName = opt.GetString() } - // connect - if err := connector.Connect(); err != nil { - return &connector, fmt.Errorf("Error while connecting to AMQP") + switch conf := cfg.(type) { + case *config.INIConfig: + opt, err = conf.GetOption("amqp1/listen_channels") + case *config.JSONConfig: + opt, err = conf.GetOption("Amqp1.Connection.ListenChannels") + } + if err != nil { + return nil, err + } + listen := []string{} + if opt != nil { + listen = opt.GetStrings(",") } - // post-connect initialization - var listen *config.Option switch conf := cfg.(type) { case *config.INIConfig: - listen, err = conf.GetOption("amqp1/listen_channels") + opt, err = conf.GetOption("amqp1/listen_prefetch") case *config.JSONConfig: - listen, err = conf.GetOption("Amqp1.Connection.ListenChannels") + opt, err = conf.GetOption("Amqp1.Connection.ListenPrefetch") } - if err == nil && listen != nil { - var prf *config.Option - switch conf := cfg.(type) { - case *config.INIConfig: - prf, err = conf.GetOption("amqp1/listen_prefetch") - case *config.JSONConfig: - prf, err = conf.GetOption("Amqp1.Connection.ListenPrefetch") - } - prefetch := int64(-1) - if err == nil && prf != nil { - prefetch = prf.GetInt() - } - for _, channel := range listen.GetStrings(",") { - if len(channel) < 1 { - continue - } - logger.Metadata(map[string]interface{}{ - "channel": channel, - "prefetch": prefetch, - }) - logger.Debug("Creating AMQP receiver for channel") - if err := connector.CreateReceiver(channel, int(prefetch)); err != nil { - return &connector, fmt.Errorf("Failed to create receiver: %s", err) - } - } + if err != nil { + return nil, err + } + prf := int64(defaultListenPrefetch) + if opt != nil { + prf = opt.GetInt() } - return &connector, nil + return CreateAMQP10Connector(logger, addr, clientName, sendTimeout, prf, listen) } //Connect creates input and output connection to configured AMQP1.0 node