Skip to content

Commit

Permalink
MF36 - Add support from rabbitmq (#37)
Browse files Browse the repository at this point in the history
* enable multiple broker types

Signed-off-by: SammyOina <sammyoina@gmail.com>

* rename env vars

Signed-off-by: SammyOina <sammyoina@gmail.com>

* Update pkg/config/config.go

Co-authored-by: b1ackd0t <blackd0t@protonmail.com>
Signed-off-by: SammyOina <sammyoina@gmail.com>

* remove deadcode

Signed-off-by: SammyOina <sammyoina@gmail.com>

---------

Signed-off-by: SammyOina <sammyoina@gmail.com>
Co-authored-by: b1ackd0t <blackd0t@protonmail.com>
  • Loading branch information
SammyOina and rodneyosodo authored Jul 28, 2023
1 parent 67d0bc7 commit 728678c
Show file tree
Hide file tree
Showing 45 changed files with 10,302 additions and 33 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ VERSION ?= $(shell git describe --abbrev=0 --tags)
COMMIT ?= $(shell git rev-parse HEAD)
TIME ?= $(shell date +%F_%T)

ifneq ($(MF_BROKER_TYPE),)
MF_BROKER_TYPE := $(MF_BROKER_TYPE)
else
MF_BROKER_TYPE=nats
endif

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \
go build -mod=vendor -ldflags "-s -w \
go build -mod=vendor -tags $(MF_BROKER_TYPE) -ldflags "-s -w \
-X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \
-X 'github.com/mainflux/mainflux.Version=$(VERSION)' \
-X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \
Expand Down
27 changes: 15 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"context"
"crypto/tls"
"fmt"
"io"
Expand All @@ -22,12 +23,14 @@ import (
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
nats "github.com/nats-io/nats.go"
)

const (
svcName = "export"
defNatsURL = nats.DefaultURL
defBrokerURL = nats.DefaultURL
defLogLevel = "debug"
defPort = "8170"
defMqttHost = "tcp://localhost:1883"
Expand All @@ -47,9 +50,9 @@ const (
defCachePass = ""
defCacheDB = "0"

envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_EXPORT_LOG_LEVEL"
envPort = "MF_EXPORT_PORT"
envBrokerURL = "MF_BROKER_URL"
envLogLevel = "MF_EXPORT_LOG_LEVEL"
envPort = "MF_EXPORT_PORT"

envMqttHost = "MF_EXPORT_MQTT_HOST"
envMqttUsername = "MF_EXPORT_MQTT_USERNAME"
Expand All @@ -73,6 +76,7 @@ const (
)

func main() {
ctx := context.Background()
cfg, err := loadConfigs()
if err != nil {
log.Fatalf(err.Error())
Expand All @@ -83,14 +87,13 @@ func main() {
log.Fatalf(err.Error())
}

nc, err := nats.Connect(cfg.Server.NatsURL)
pubsub, err := brokers.NewPubSub(cfg.Server.BrokerURL, "", logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Server.NatsURL))
os.Exit(1)
logger.Fatal(fmt.Sprintf("Failed to connect to Broker: %s %s", err, cfg.Server.BrokerURL))
}
defer nc.Close()
defer pubsub.Close()

svc, err := export.New(cfg, logger)
svc, err := export.New(cfg, logger, pubsub)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create service :%s", err))
return
Expand All @@ -99,14 +102,14 @@ func main() {
logger.Error(fmt.Sprintf("Failed to start service %s", err))
return
}
svc.Subscribe(nc)
svc.Subscribe(ctx)

// Publish heartbeat
ticker := time.NewTicker(10000 * time.Millisecond)
go func() {
subject := fmt.Sprintf("%s.%s.%s", heartbeatSubject, svcName, service)
for range ticker.C {
if err := nc.Publish(subject, []byte{}); err != nil {
if err := pubsub.Publish(ctx, subject, &messaging.Message{Channel: subject}); err != nil {
logger.Error(fmt.Sprintf("Failed to publish heartbeat, %s", err))
}
}
Expand Down Expand Up @@ -149,7 +152,7 @@ func loadConfigs() (exp.Config, error) {
QoS := int(q)

sc := exp.Server{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
BrokerURL: mainflux.Env(envBrokerURL, defBrokerURL),
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
Port: mainflux.Env(envPort, defPort),
CachePass: mainflux.Env(envCachePass, defCachePass),
Expand Down
2 changes: 1 addition & 1 deletion docker/.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Docker: Environment variables in Compose

### Export
MF_NATS_URL=nats://broker:4222
MF_BROKER_URL=nats://broker:4222
MF_EXPORT_PORT=8178
MF_EXPORT_LOG_LEVEL=debug
MF_EXPORT_CONFIG_FILE=config.toml
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
Expand Down
43 changes: 43 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/containerd/continuity v0.4.1 h1:wQnVrjIyQ8vhU2sgOiL5T07jo+ouqc2bnKsv5/EqGhU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/cli v24.0.2+incompatible h1:QdqR7znue1mtkXIJ+ruQMGQhpw2JzMJLRXp6zpzF6tM=
github.com/docker/docker v24.0.2+incompatible h1:eATx+oLz9WdNVkQrr0qjQ8HvRJ4bOOxfzEo8R+dA3cg=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
Expand All @@ -29,21 +40,28 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mainflux/mainflux v0.0.0-20230713105239-52131eba669c h1:W6b2x4ngTpEC40RqHHktciQd8dKbDZbqKnAz26jliNI=
github.com/mainflux/mainflux v0.0.0-20230713105239-52131eba669c/go.mod h1:FoeJ13mrfikrsFDW6bOb3C44D5gZ5m9Jt249G1sLKq0=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
Expand All @@ -59,9 +77,15 @@ github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
github.com/opencontainers/runc v1.1.7 h1:y2EZDS8sNng4Ksf0GUYNhKbTShZJPJg1FiXJNH/uoCk=
github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
Expand All @@ -70,11 +94,23 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand All @@ -85,6 +121,7 @@ golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -118,6 +155,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -132,4 +170,9 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type MQTT struct {
}

type Server struct {
NatsURL string `json:"nats" toml:"nats" mapstructure:"nats"`
BrokerURL string `json:"broker_url" toml:"broker_url" mapstructure:"broker_url"`
LogLevel string `json:"log_level" toml:"log_level" mapstructure:"log_level"`
Port string `json:"port" toml:"port" mapstructure:"port"`
CacheURL string `json:"cache_url" toml:"cache_url" mapstructure:"port"`
Expand Down
13 changes: 6 additions & 7 deletions pkg/export/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
nats "github.com/nats-io/nats.go"
)

const (
Expand All @@ -41,7 +40,7 @@ type Route struct {
NatsTopic string
MqttTopic string
Subtopic string
Messages chan *nats.Msg
Messages chan *messaging.Message
Workers int
Type string
logger logger.Logger
Expand All @@ -59,7 +58,7 @@ func NewRoute(rc config.Route, log logger.Logger, pub messages.Publisher) *Route
Subtopic: rc.SubTopic,
Type: rc.Type,
Workers: w,
Messages: make(chan *nats.Msg, w),
Messages: make(chan *messaging.Message, w),
logger: log,
pub: pub,
}
Expand All @@ -85,19 +84,19 @@ func (r *Route) Process(data []byte) ([]byte, error) {

func (r *Route) Consume() {
for msg := range r.Messages {
payload, err := r.Process(msg.Data)
payload, err := r.Process(msg.Payload)
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to consume message %s", err))
}
topic := r.MqttTopic
if r.Subtopic != "" {
topic = fmt.Sprintf("%s/%s", r.MqttTopic, r.Subtopic)
}
topic = fmt.Sprintf("%s/%s", topic, strings.ReplaceAll(msg.Subject, ".", "/"))
if err := r.pub.Publish(msg.Subject, topic, payload); err != nil {
topic = fmt.Sprintf("%s/%s", topic, strings.ReplaceAll(msg.Channel, ".", "/"))
if err := r.pub.Publish(msg.Channel, topic, payload); err != nil {
r.logger.Error(fmt.Sprintf("Failed to publish on route %s: %s", r.MqttTopic, err))
}
r.msgDebug(msg.Subject, payload)
r.msgDebug(msg.Channel, payload)
}
}

Expand Down
42 changes: 31 additions & 11 deletions pkg/export/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package export

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
Expand All @@ -15,14 +16,14 @@ import (
"github.com/mainflux/export/pkg/messages"
logger "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
nats "github.com/nats-io/nats.go"
"github.com/mainflux/mainflux/pkg/messaging"
)

var errNoCacheConfigured = errors.New("No cache configured")

type Exporter interface {
Start(queue string) errors.Error
Subscribe(nc *nats.Conn)
Subscribe(ctx context.Context)
Logger() logger.Logger
}
type Service interface {
Expand All @@ -38,21 +39,22 @@ type exporter struct {
cfg config.Config
consumers map[string]*Route
logger logger.Logger
pubsub messaging.PubSub
sync.RWMutex
}

const (
exportGroup = "export"
NatsSub = "export"
NatsAll = ">"
Channels = "channels"
Messages = "messages"
NatsSub = "export"
NatsAll = ">"
Channels = "channels"
Messages = "messages"
svcName = "export"
)

var errNoRoutesConfigured = errors.New("No routes configured")

// New create new instance of export service.
func New(c config.Config, l logger.Logger) (Service, error) {
func New(c config.Config, l logger.Logger, pubsub messaging.PubSub) (Service, error) {
routes := make(map[string]*Route)
id := fmt.Sprintf("export-%s", c.MQTT.Username)

Expand All @@ -61,6 +63,7 @@ func New(c config.Config, l logger.Logger) (Service, error) {
logger: l,
cfg: c,
consumers: routes,
pubsub: pubsub,
}
client, err := e.mqttConnect(c, l)
if err != nil {
Expand Down Expand Up @@ -104,10 +107,27 @@ func (e *exporter) newRoute(r config.Route) *Route {
return NewRoute(r, e.logger, e)
}

func (e *exporter) Subscribe(nc *nats.Conn) {
type handleFunc func(msg *messaging.Message) error

func (h handleFunc) Handle(msg *messaging.Message) error {
return h(msg)

}

func (h handleFunc) Cancel() error {
return nil
}

func handle(mesChan chan *messaging.Message) handleFunc {
return func(msg *messaging.Message) error {
mesChan <- msg
return nil
}
}

func (e *exporter) Subscribe(ctx context.Context) {
for _, r := range e.consumers {
_, err := nc.ChanQueueSubscribe(r.NatsTopic, exportGroup, r.Messages)
if err != nil {
if err := e.pubsub.Subscribe(ctx, svcName, r.NatsTopic, handle(r.Messages)); err != nil {
e.logger.Error(fmt.Sprintf("Failed to subscribe to NATS %s: %s", r.NatsTopic, err))
}
for i := 0; i < r.Workers; i++ {
Expand Down
Loading

0 comments on commit 728678c

Please sign in to comment.