Skip to content

Commit

Permalink
Split AMQP10Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
paramite committed Feb 23, 2021
1 parent 31d051e commit a34c55d
Showing 1 changed file with 86 additions and 61 deletions.
147 changes: 86 additions & 61 deletions connector/amqp10.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (
"github.com/apache/qpid-proton/go/pkg/electron"
)

const (
defaultSendTimeout = 2
defaultListenPrefetch = -1
defaultClientName = "localhost"
)

//AMQP10Receiver is tagged electron receiver
type AMQP10Receiver struct {
Receiver electron.Receiver
Tags []string
}

//LokiConnector is the object to be used for communication with AMQP-1.0 entity
//AMQP10Connector is the object to be used for communication with AMQP-1.0 entity
type AMQP10Connector struct {
Address string
ClientName string
Expand All @@ -36,96 +42,115 @@ type AMQP10Message struct {
Tags []string
}

//CreateAMQP10Connector creates the connector and connects to given AMQP1.0 service
func CreateAMQP10Connector(logger *logging.Logger, address string, clientName string, sendTimeout int64, listenPrefetch int64, listenChannels []string) (*AMQP10Connector, error) {
connector := AMQP10Connector{
Address: address,
ClientName: clientName,
SendTimeout: sendTimeout,
logger: logger,
receivers: make([]AMQP10Receiver, 0),
}

// connect
if err := connector.Connect(); err != nil {
return &connector, fmt.Errorf("Error while connecting to AMQP")
}
// bind to channels
for _, channel := range listenChannels {
if len(channel) < 1 {
continue
}
logger.Metadata(map[string]interface{}{
"channel": channel,
"prefetch": listenPrefetch,
})
logger.Debug("Creating AMQP receiver for channel")
if err := connector.CreateReceiver(channel, int(listenPrefetch)); err != nil {
return &connector, fmt.Errorf("Failed to create receiver: %s", err)
}
}
return &connector, nil
}

//ConnectAMQP10 creates new AMQP1.0 connector from the given configuration file
func ConnectAMQP10(cfg config.Config, logger *logging.Logger) (*AMQP10Connector, error) {
connector := AMQP10Connector{}
connector.receivers = make([]AMQP10Receiver, 0)
connector.logger = logger

var err error
// pre-connect initialization
var addr *config.Option
var opt *config.Option

switch conf := cfg.(type) {
case *config.INIConfig:
addr, err = conf.GetOption("amqp1/connection")
opt, err = conf.GetOption("amqp1/connection")
case *config.JSONConfig:
addr, err = conf.GetOption("Amqp1.Connection.Address")
opt, err = conf.GetOption("Amqp1.Connection.Address")
default:
return &connector, fmt.Errorf("Unknown Config type")
return nil, fmt.Errorf("Unknown Config type")
}
if err == nil && addr != nil {
connector.Address = addr.GetString()
} else {
return &connector, fmt.Errorf("Failed to get connection URL from configuration file: %s", err)
if err != nil {
return nil, err
}
if opt == nil {
return nil, fmt.Errorf("Failed to get connection URL from configuration file")
}
addr := opt.GetString()

var sendTimeout *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
sendTimeout, err = conf.GetOption("amqp1/send_timeout")
opt, err = conf.GetOption("amqp1/send_timeout")
case *config.JSONConfig:
sendTimeout, err = conf.GetOption("Amqp1.Connection.SendTimeout")
opt, err = conf.GetOption("Amqp1.Connection.SendTimeout")
}
if err == nil && sendTimeout != nil {
connector.SendTimeout = sendTimeout.GetInt()
} else {
return &connector, fmt.Errorf("Failed to get send timeout from configuration file: %s", err)
if err != nil {
return nil, err
}
sendTimeout := int64(defaultSendTimeout)
if opt != nil {
sendTimeout = opt.GetInt()
}

var clientName *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
clientName, err = conf.GetOption("amqp1/client_name")
opt, err = conf.GetOption("amqp1/client_name")
case *config.JSONConfig:
clientName, err = conf.GetOption("Amqp1.Client.Name")
opt, err = conf.GetOption("Amqp1.Client.Name")
}
if err == nil && clientName != nil {
connector.ClientName = clientName.GetString()
} else {
return &connector, fmt.Errorf("Failed to get client name from configuration file: %s", err)
if err != nil {
return nil, err
}
clientName := defaultClientName
if opt != nil {
clientName = opt.GetString()
}

// connect
if err := connector.Connect(); err != nil {
return &connector, fmt.Errorf("Error while connecting to AMQP")
switch conf := cfg.(type) {
case *config.INIConfig:
opt, err = conf.GetOption("amqp1/listen_channels")
case *config.JSONConfig:
opt, err = conf.GetOption("Amqp1.Connection.ListenChannels")
}
if err != nil {
return nil, err
}
listen := []string{}
if opt != nil {
listen = opt.GetStrings(",")
}

// post-connect initialization
var listen *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
listen, err = conf.GetOption("amqp1/listen_channels")
opt, err = conf.GetOption("amqp1/listen_prefetch")
case *config.JSONConfig:
listen, err = conf.GetOption("Amqp1.Connection.ListenChannels")
opt, err = conf.GetOption("Amqp1.Connection.ListenPrefetch")
}
if err == nil && listen != nil {
var prf *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
prf, err = conf.GetOption("amqp1/listen_prefetch")
case *config.JSONConfig:
prf, err = conf.GetOption("Amqp1.Connection.ListenPrefetch")
}
prefetch := int64(-1)
if err == nil && prf != nil {
prefetch = prf.GetInt()
}
for _, channel := range listen.GetStrings(",") {
if len(channel) < 1 {
continue
}
logger.Metadata(map[string]interface{}{
"channel": channel,
"prefetch": prefetch,
})
logger.Debug("Creating AMQP receiver for channel")
if err := connector.CreateReceiver(channel, int(prefetch)); err != nil {
return &connector, fmt.Errorf("Failed to create receiver: %s", err)
}
}
if err != nil {
return nil, err
}
prf := int64(defaultListenPrefetch)
if opt != nil {
prf = opt.GetInt()
}

return &connector, nil
return CreateAMQP10Connector(logger, addr, clientName, sendTimeout, prf, listen)
}

//Connect creates input and output connection to configured AMQP1.0 node
Expand Down

0 comments on commit a34c55d

Please sign in to comment.