Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for rabbitmq broker #56

Merged
merged 10 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ VERSION ?= $(shell git describe --abbrev=0 --tags)
COMMIT ?= $(shell git rev-parse HEAD)
TIME ?= $(shell date +%F_%T)

ifneq ($(MF_BROKER_TYPE),)
MF_BROKER_TYPE := $(MF_BROKER_TYPE)
else
MF_BROKER_TYPE=nats
endif

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \
go build -mod=vendor -ldflags "-s -w \
go build -mod=vendor -tags $(MF_BROKER_TYPE) -ldflags "-s -w \
-X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \
-X 'github.com/mainflux/mainflux.Version=$(VERSION)' \
-X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \
Expand Down
42 changes: 33 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Mainflux IoT Agent

![](https://github.com/mainflux/agent/workflows/Go/badge.svg)
![badge](https://github.com/mainflux/agent/workflows/Go/badge.svg)
![ci][ci]
![release][release]
[![go report card][grc-badge]][grc-url]
Expand All @@ -14,6 +14,7 @@
Mainflux IoT Agent is a communication, execution and SW management agent for Mainflux system.

## Install

Get the code:

```bash
Expand All @@ -22,11 +23,13 @@ cd $GOPATH/github.com/mainflux/agent
```

Make:

```bash
make
```

## Usage

Get Nats server and start it, by default it starts on port `4222`

```bash
Expand All @@ -44,6 +47,7 @@ MF_AGENT_BOOTSTRAP_KEY=<bootstrap_key> \
MF_AGENT_BOOTSTRAP_URL=http://localhost:9013/things/bootstrap \
build/mainflux-agent
```

or,if [Mainflux UI](https://github.com/mainflux/ui) is used,

```bash
Expand All @@ -54,9 +58,11 @@ build/mainflux-agent
```

### Config

Agent configuration is kept in `config.toml` if not otherwise specified with env var.

Example configuration:

```toml
[Agent]

Expand All @@ -83,7 +89,7 @@ Example configuration:
username = ""

[Agent.server]
nats_url = "localhost:4222"
broker_url = "localhost:4222"
port = "9999"

```
Expand All @@ -105,7 +111,7 @@ Environment:
| MF_AGENT_CONTROL_CHANNEL | Channel for sending controls, commands | |
| MF_AGENT_DATA_CHANNEL | Channel for data sending | |
| MF_AGENT_ENCRYPTION | Encryption | false |
| MF_AGENT_NATS_URL | Nats url | nats://localhost:4222 |
| MF_AGENT_BROKER_URL | Broker url | nats://localhost:4222 |
| MF_AGENT_MQTT_USERNAME | MQTT username, Mainflux thing id | |
| MF_AGENT_MQTT_PASSWORD | MQTT password, Mainflux thing key | |
| MF_AGENT_MQTT_SKIP_TLS | Skip TLS verification for MQTT | true |
Expand All @@ -122,12 +128,15 @@ Here `thing` is a Mainflux thing, and control channel from `channels` is used wi
(i.e. app needs to PUB/SUB on `/channels/<control_channel_id>/messages/req` and `/channels/<control_channel_id>/messages/res`).

## Sending commands to other services
You can send commands to other services that are subscribed on the same Nats server as Agent.
Commands are being sent via MQTT to topic:
* `channels/<control_channel_id>/messages/services/<service_name>/<subtopic>`

when messages is received Agent forwards them to Nats on subject:
* `commands.<service_name>.<subtopic>`.
You can send commands to other services that are subscribed on the same Broker as Agent.
Commands are being sent via MQTT to topic:

* `channels/<control_channel_id>/messages/services/<service_name>/<subtopic>`

when messages is received Agent forwards them to Broker on subject:

* `commands.<service_name>.<subtopic>`.

Payload is up to the application and service itself.

Expand All @@ -138,16 +147,26 @@ mosquitto_pub -u <thing_id> -P <thing_key> -t channels/<control_channel_id>/mess
```

## Heartbeat service

Services running on the same host can publish to `heartbeat.<service-name>.<service-type>` a heartbeat message.
Agent will keep a record on those service and update their `live` status.
If heartbeat is not received in 10 sec it marks it `offline`.
Upon next heartbeat service will be marked `online` again.

To test heartbeat run:

```bash
go run -tags <broker_name> ./examples/publish/main.go -s <broker_url> heartbeat.<service-name>.<service-type> "";
```

Broker names include: nats and rabbitmq.

To check services that are currently registered to agent you can:

```bash
curl -s -S X GET http://localhost:9999/services
```

```json
[
{
Expand Down Expand Up @@ -196,23 +215,28 @@ Check the output in terminal where you subscribed for results. You should see so


## How to save config via agent

Agent can be used to send configuration file for the [Export][export] service from cloud to gateway via MQTT.
Here is the example command:

```bash
mosquitto_pub -u <thing_id> -P <thing_key> -t channels/<control_channel_id>/messages/req -h localhost -p 1883 -m "[{\"bn\":\"1:\", \"n\":\"config\", \"vs\":\"<config_file_path>, <file_content_base64>\"}]"

```

* `<config_file_path>` - file path where to save contents
* `<file_content_base64>` - file content, base64 encoded marshaled toml.

Here is an example how to make payload for the command:

```go
b,_ := toml.Marshal(export.Config)
payload := base64.StdEncoding.EncodeToString(b)
```

Example payload:
```

```text
RmlsZSA9ICIuLi9jb25maWdzL2NvbmZpZy50b21sIgoKW2V4cF0KICBsb2dfbGV2ZWwgPSAiZGVidWciCiAgbmF0cyA9ICJuYXRzOi8vMTI3LjAuMC4xOjQyMjIiCiAgcG9ydCA9ICI4MTcwIgoKW21xdHRdCiAgY2FfcGF0aCA9ICJjYS5jcnQiCiAgY2VydF9wYXRoID0gInRoaW5nLmNydCIKICBjaGFubmVsID0gIiIKICBob3N0ID0gInRjcDovL2xvY2FsaG9zdDoxODgzIgogIG10bHMgPSBmYWxzZQogIHBhc3N3b3JkID0gImFjNmI1N2UwLTliNzAtNDVkNi05NGM4LWU2N2FjOTA4NjE2NSIKICBwcml2X2tleV9wYXRoID0gInRoaW5nLmtleSIKICBxb3MgPSAwCiAgcmV0YWluID0gZmFsc2UKICBza2lwX3Rsc192ZXIgPSBmYWxzZQogIHVzZXJuYW1lID0gIjRhNDM3ZjQ2LWRhN2ItNDQ2OS05NmI3LWJlNzU0YjVlOGQzNiIKCltbcm91dGVzXV0KICBtcXR0X3RvcGljID0gIjRjNjZhNzg1LTE5MDAtNDg0NC04Y2FhLTU2ZmI4Y2ZkNjFlYiIKICBuYXRzX3RvcGljID0gIioiCg==
```

Expand Down
23 changes: 12 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
nats "github.com/nats-io/nats.go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -92,6 +93,9 @@ var (
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

cfg, err := loadEnvConfig()
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to load config: %s", err))
Expand All @@ -107,12 +111,11 @@ func main() {
logger.Error(fmt.Sprintf("Failed to load config: %s", err))
}

nc, err := nats.Connect(cfg.Server.NatsURL)
pubsub, err := brokers.NewPubSub(cfg.Server.BrokerURL, "", logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Server.NatsURL))
os.Exit(1)
logger.Fatal(fmt.Sprintf("Failed to connect to Broker: %s %s", err, cfg.Server.BrokerURL))
}
defer nc.Close()
defer pubsub.Close()

mqttClient, err := connectToMQTTBroker(cfg.MQTT, logger)
if err != nil {
Expand All @@ -121,7 +124,7 @@ func main() {
}
edgexClient := edgex.NewClient(cfg.Edgex.URL, logger)

svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger)
svc, err := agent.New(ctx, mqttClient, &cfg, edgexClient, pubsub, logger)
if err != nil {
logger.Error(fmt.Sprintf("Error in agent service: %s", err))
return
Expand All @@ -143,17 +146,15 @@ func main() {
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
b := conn.NewBroker(svc, mqttClient, cfg.Channels.Control, nc, logger)
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
b := conn.NewBroker(svc, mqttClient, cfg.Channels.Control, pubsub, logger)

srv := &http.Server{
Addr: fmt.Sprintf(":%s", cfg.Server.Port),
Handler: api.MakeHandler(svc),
}

g.Go(func() error {
return b.Subscribe()
return b.Subscribe(ctx)
})

g.Go(func() error {
Expand All @@ -172,8 +173,8 @@ func main() {

func loadEnvConfig() (agent.Config, error) {
sc := agent.ServerConfig{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envHTTPPort, defHTTPPort),
BrokerURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envHTTPPort, defHTTPPort),
}
cc := agent.ChanConfig{
Control: mainflux.Env(envCtrlChan, defCtrlChan),
Expand Down
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ File = "config.toml"
username = "77d74527-7457-4dc2-9b36-01f01ce62726"

[server]
nats_url = "localhost:4222"
broker_url = "localhost:4222"
port = "9999"

[terminal]
Expand Down
64 changes: 64 additions & 0 deletions examples/publish/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main
SammyOina marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"flag"
"log"
"os"

mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
"github.com/nats-io/nats.go"
)

func main() {
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var showHelp = flag.Bool("h", false, "Show help message")

log.SetFlags(0)
flag.Usage = usage
flag.Parse()

if *showHelp {
showUsageAndExit(0)
}

args := flag.Args()
if len(args) != 2 {
showUsageAndExit(1)
}

subj, msg := args[0], []byte(args[1])

logger, err := mflog.New(os.Stdout, "info")
if err != nil {
log.Fatalf("failed to init logger: %s", err)
}

ps, err := brokers.NewPublisher(*urls)
if err != nil {
logger.Error(err.Error())
return
}
defer ps.Close()

if err := ps.Publish(context.Background(), subj, &messaging.Message{
Channel: subj,
Payload: msg,
}); err != nil {
logger.Error(err.Error())
return
}
logger.Info("Message published")
}

func usage() {
log.Printf("Usage: publish [-s server] <channel> <msg>\n")
flag.PrintDefaults()
}

func showUsageAndExit(exitcode int) {
usage()
os.Exit(exitcode)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
Expand Down Expand Up @@ -160,6 +163,7 @@ github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
Expand Down Expand Up @@ -197,6 +201,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/go-kit/kit/otelkit v0.42.
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down Expand Up @@ -279,6 +285,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
Loading