Skip to content

Commit

Permalink
refactor: improve publisher initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
restuhaqza committed Nov 24, 2021
1 parent 042e70f commit f7d9368
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 24 deletions.
27 changes: 5 additions & 22 deletions amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"log"

"github.com/Gate2Up/rabbitmq-go/publisher"
"github.com/Gate2Up/rabbitmq-go/subscriber"
"github.com/streadway/amqp"
)
Expand Down Expand Up @@ -38,28 +37,12 @@ func NewClient(config Config) (*AmqpClient, error) {
return &AmqpClient{Connection: amqpConn, ServiceName: config.ServiceName}, nil
}

func (a *AmqpClient) AddPublisher(publisher *publisher.PublisherConfig) {
channel, err := a.Connection.Channel()
if err != nil {
log.Println(err.Error())
return
}

err = channel.ExchangeDeclare(
publisher.TopicName,
amqp.ExchangeTopic,
true,
false,
false,
false,
nil,
)

if err != nil {
log.Println(`Create exchange failed: `, err.Error())
}
type Publisher interface {
Build(client *AmqpClient)
}

log.Println(fmt.Sprintf(`Exchange: %s created`, publisher.TopicName))
func (a *AmqpClient) AddPublisher(publisher Publisher) {
publisher.Build(a)
}

func (a *AmqpClient) AddSubscriber(subscriber *subscriber.SubscriberConfig) {
Expand Down
8 changes: 6 additions & 2 deletions amqp/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ func TestNewClient(t *testing.T) {

func TestAddPublisher(t *testing.T) {

publisherConfig := publisher.NewPublisher("TEST_TOPIC", nil)
publisher := publisher.NewPublisher("TEST_TOPIC", nil)
conn, _ := amqp.NewClient(config)

// if no error this case is passed - void
conn.AddPublisher(publisherConfig)
conn.AddPublisher(publisher)
status, err := publisher.Publish([]byte(`Hello World`))

assert.Equal(t, status, true)
assert.Equal(t, err, nil)
}

func TestAddSubscriber(t *testing.T) {
Expand Down
61 changes: 61 additions & 0 deletions publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package publisher

import (
"fmt"
"log"

"github.com/Gate2Up/rabbitmq-go/amqp"
amqpLegacy "github.com/streadway/amqp"
)

type PublisherConfig struct {
TopicName string
Schema interface{}
Client *amqp.AmqpClient
}

type schemaType interface{}
Expand All @@ -11,7 +20,59 @@ func NewPublisher(topicName string, schema schemaType) *PublisherConfig {
publisherConfig := PublisherConfig{
TopicName: topicName,
Schema: schema,
Client: nil,
}

return &publisherConfig
}

func (p *PublisherConfig) Build(client *amqp.AmqpClient) {

if client == nil {
log.Fatalln(`amqp client is nil`)
}

p.Client = client

channel, err := client.Connection.Channel()
if err != nil {
log.Println(err.Error())
return
}

err = channel.ExchangeDeclare(
p.TopicName,
amqpLegacy.ExchangeTopic,
true,
false,
false,
false,
nil,
)

if err != nil {
log.Println(`Create exchange failed: `, err.Error())
}

log.Println(fmt.Sprintf(`Exchange: %s created`, p.TopicName))
}

func (p *PublisherConfig) Publish(data []byte) (bool, error) {
channel, err := p.Client.Connection.Channel()
if err != nil {
return false, err
}

content := amqpLegacy.Publishing{
ContentType: "text/plain",
Body: data,
}

err = channel.Publish(p.TopicName, "*", true, true, content)

if err != nil {
return false, err
}

return true, nil
}

0 comments on commit f7d9368

Please sign in to comment.