Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for rabbitmq broker #56

Merged
merged 10 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ VERSION ?= $(shell git describe --abbrev=0 --tags)
COMMIT ?= $(shell git rev-parse HEAD)
TIME ?= $(shell date +%F_%T)

ifneq ($(MF_BROKER_TYPE),)
MF_BROKER_TYPE := $(MF_BROKER_TYPE)
else
MF_BROKER_TYPE=nats
endif

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \
go build -mod=vendor -ldflags "-s -w \
go build -mod=vendor -tags $(MF_BROKER_TYPE) -ldflags "-s -w \
-X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \
-X 'github.com/mainflux/mainflux.Version=$(VERSION)' \
-X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \
Expand Down
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Example configuration:
username = ""

[Agent.server]
nats_url = "localhost:4222"
broker_url = "localhost:4222"
port = "9999"

```
Expand All @@ -105,7 +105,7 @@ Environment:
| MF_AGENT_CONTROL_CHANNEL | Channel for sending controls, commands | |
| MF_AGENT_DATA_CHANNEL | Channel for data sending | |
| MF_AGENT_ENCRYPTION | Encryption | false |
| MF_AGENT_NATS_URL | Nats url | nats://localhost:4222 |
| MF_AGENT_BROKER_URL | Broker url | nats://localhost:4222 |
SammyOina marked this conversation as resolved.
Show resolved Hide resolved
| MF_AGENT_MQTT_USERNAME | MQTT username, Mainflux thing id | |
| MF_AGENT_MQTT_PASSWORD | MQTT password, Mainflux thing key | |
| MF_AGENT_MQTT_SKIP_TLS | Skip TLS verification for MQTT | true |
Expand All @@ -122,11 +122,11 @@ Here `thing` is a Mainflux thing, and control channel from `channels` is used wi
(i.e. app needs to PUB/SUB on `/channels/<control_channel_id>/messages/req` and `/channels/<control_channel_id>/messages/res`).

## Sending commands to other services
You can send commands to other services that are subscribed on the same Nats server as Agent.
You can send commands to other services that are subscribed on the same Broker as Agent.
Commands are being sent via MQTT to topic:
* `channels/<control_channel_id>/messages/services/<service_name>/<subtopic>`

when messages is received Agent forwards them to Nats on subject:
when messages is received Agent forwards them to Broker on subject:
* `commands.<service_name>.<subtopic>`.

Payload is up to the application and service itself.
Expand All @@ -143,6 +143,12 @@ Agent will keep a record on those service and update their `live` status.
If heartbeat is not received in 10 sec it marks it `offline`.
Upon next heartbeat service will be marked `online` again.

To test heartbeat run:
```bash
go run -tags <broker_name> ./examples/publish/main.go -s <broker_url> heartbeat.<service-name>.<service-type> "";
```
Broker names include: nats and rabbitmq.

To check services that are currently registered to agent you can:

```bash
Expand Down
23 changes: 12 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
nats "github.com/nats-io/nats.go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -92,6 +93,9 @@ var (
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

cfg, err := loadEnvConfig()
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to load config: %s", err))
Expand All @@ -107,12 +111,11 @@ func main() {
logger.Error(fmt.Sprintf("Failed to load config: %s", err))
}

nc, err := nats.Connect(cfg.Server.NatsURL)
pubsub, err := brokers.NewPubSub(cfg.Server.BrokerURL, "", logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Server.NatsURL))
os.Exit(1)
logger.Fatal(fmt.Sprintf("Failed to connect to Broker: %s %s", err, cfg.Server.BrokerURL))
}
defer nc.Close()
defer pubsub.Close()

mqttClient, err := connectToMQTTBroker(cfg.MQTT, logger)
if err != nil {
Expand All @@ -121,7 +124,7 @@ func main() {
}
edgexClient := edgex.NewClient(cfg.Edgex.URL, logger)

svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger)
svc, err := agent.New(ctx, mqttClient, &cfg, edgexClient, pubsub, logger)
if err != nil {
logger.Error(fmt.Sprintf("Error in agent service: %s", err))
return
Expand All @@ -143,17 +146,15 @@ func main() {
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
b := conn.NewBroker(svc, mqttClient, cfg.Channels.Control, nc, logger)
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
b := conn.NewBroker(svc, mqttClient, cfg.Channels.Control, pubsub, logger)

srv := &http.Server{
Addr: fmt.Sprintf(":%s", cfg.Server.Port),
Handler: api.MakeHandler(svc),
}

g.Go(func() error {
return b.Subscribe()
return b.Subscribe(ctx)
})

g.Go(func() error {
Expand All @@ -172,8 +173,8 @@ func main() {

func loadEnvConfig() (agent.Config, error) {
sc := agent.ServerConfig{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envHTTPPort, defHTTPPort),
BrokerURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envHTTPPort, defHTTPPort),
}
cc := agent.ChanConfig{
Control: mainflux.Env(envCtrlChan, defCtrlChan),
Expand Down
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ File = "config.toml"
username = "77d74527-7457-4dc2-9b36-01f01ce62726"

[server]
nats_url = "localhost:4222"
broker_url = "amqp://mainflux:mainflux@localhost:5672/"
SammyOina marked this conversation as resolved.
Show resolved Hide resolved
port = "9999"

[terminal]
Expand Down
64 changes: 64 additions & 0 deletions examples/publish/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main
SammyOina marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"flag"
"log"
"os"

mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
"github.com/nats-io/nats.go"
)

func main() {
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var showHelp = flag.Bool("h", false, "Show help message")

log.SetFlags(0)
flag.Usage = usage
flag.Parse()

if *showHelp {
showUsageAndExit(0)
}

args := flag.Args()
if len(args) != 2 {
showUsageAndExit(1)
}

subj, msg := args[0], []byte(args[1])

logger, err := mflog.New(os.Stdout, "info")
if err != nil {
log.Fatalf("failed to init logger: %s", err)
}

ps, err := brokers.NewPublisher(*urls)
if err != nil {
logger.Error(err.Error())
return
}
defer ps.Close()

if err := ps.Publish(context.Background(), subj, &messaging.Message{
Channel: subj,
Payload: msg,
}); err != nil {
logger.Error(err.Error())
return
}
logger.Info("Message published")
}

func usage() {
log.Printf("Usage: publish [-s server] <channel> <msg>\n")
flag.PrintDefaults()
}

func showUsageAndExit(exitcode int) {
usage()
os.Exit(exitcode)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
Expand Down Expand Up @@ -160,6 +163,7 @@ github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
Expand Down Expand Up @@ -197,6 +201,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/go-kit/kit/otelkit v0.42.
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down Expand Up @@ -279,6 +285,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
14 changes: 8 additions & 6 deletions pkg/agent/api/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package api_test

import (
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -18,9 +19,9 @@ import (
"github.com/mainflux/agent/pkg/agent"
"github.com/mainflux/agent/pkg/agent/api"
"github.com/mainflux/agent/pkg/agent/mocks"
"github.com/nats-io/nats.go"

"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
"github.com/stretchr/testify/assert"
)

Expand All @@ -40,7 +41,7 @@ func (tr testRequest) make() (*http.Response, error) {
return tr.client.Do(req)
}

func newService() (agent.Service, error) {
func newService(ctx context.Context) (agent.Service, error) {
opts := paho.NewClientOptions().
SetUsername(username).
AddBroker(mqttAddress).
Expand All @@ -61,12 +62,13 @@ func newService() (agent.Service, error) {
return nil, err
}

nc, err := nats.Connect(natsAddress)
pubsub, err := brokers.NewPubSub(natsAddress, "", logger)
SammyOina marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
return nil, fmt.Errorf("Failed to connect to Broker: %s %s", err, natsAddress)
}
defer pubsub.Close()

agentSvc, err := agent.New(mqttClient, &config, edgexClient, nc, logger)
agentSvc, err := agent.New(ctx, mqttClient, &config, edgexClient, pubsub, logger)
if err != nil {
return nil, err
}
Expand All @@ -85,7 +87,7 @@ func toJSON(data interface{}) string {
}

func TestPublish(t *testing.T) {
svc, err := newService()
svc, err := newService(context.TODO())
if err != nil {
t.Errorf("failed to create service: %v", err)
return
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package api

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -84,7 +85,7 @@ func (lm loggingMiddleware) Config() agent.Config {
return lm.svc.Config()
}

func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) {
func (lm loggingMiddleware) ServiceConfig(ctx context.Context, uuid, cmdStr string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method service_config took %s to complete", time.Since(begin))
if err != nil {
Expand All @@ -94,7 +95,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) {
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.ServiceConfig(uuid, cmdStr)
return lm.svc.ServiceConfig(ctx, uuid, cmdStr)
}

func (lm loggingMiddleware) Services() []agent.Info {
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/api/metrics.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

//go:build !test
// +build !test

package api

import (
"context"
"time"

"github.com/go-kit/kit/metrics"
Expand Down Expand Up @@ -56,13 +58,13 @@ func (ms *metricsMiddleware) AddConfig(ec agent.Config) error {
return ms.svc.AddConfig(ec)
}

func (ms *metricsMiddleware) ServiceConfig(uuid, cmdStr string) error {
func (ms *metricsMiddleware) ServiceConfig(ctx context.Context, uuid, cmdStr string) error {
defer func(begin time.Time) {
ms.counter.With("method", "service_config").Add(1)
ms.latency.With("method", "service_config").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.ServiceConfig(uuid, cmdStr)
return ms.svc.ServiceConfig(ctx, uuid, cmdStr)
}

func (ms *metricsMiddleware) Config() agent.Config {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

type ServerConfig struct {
Port string `toml:"port" json:"port"`
NatsURL string `toml:"nats_url" json:"nats_url"`
Port string `toml:"port" json:"port"`
BrokerURL string `toml:"broker_url" json:"broker_url"`
}

type ChanConfig struct {
Expand Down
Loading