Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF36 - Add support from rabbitmq #37

Merged
merged 4 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ VERSION ?= $(shell git describe --abbrev=0 --tags)
COMMIT ?= $(shell git rev-parse HEAD)
TIME ?= $(shell date +%F_%T)

ifneq ($(MF_BROKER_TYPE),)
MF_BROKER_TYPE := $(MF_BROKER_TYPE)
else
MF_BROKER_TYPE=nats
endif

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \
go build -mod=vendor -ldflags "-s -w \
go build -mod=vendor -tags $(MF_BROKER_TYPE) -ldflags "-s -w \
-X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \
-X 'github.com/mainflux/mainflux.Version=$(VERSION)' \
-X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \
Expand Down
27 changes: 15 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"context"
"crypto/tls"
"fmt"
"io"
Expand All @@ -22,12 +23,14 @@ import (
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
nats "github.com/nats-io/nats.go"
)

const (
svcName = "export"
defNatsURL = nats.DefaultURL
defBrokerURL = nats.DefaultURL
defLogLevel = "debug"
defPort = "8170"
defMqttHost = "tcp://localhost:1883"
Expand All @@ -47,9 +50,9 @@ const (
defCachePass = ""
defCacheDB = "0"

envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_EXPORT_LOG_LEVEL"
envPort = "MF_EXPORT_PORT"
envBrokerURL = "MF_BROKER_URL"
envLogLevel = "MF_EXPORT_LOG_LEVEL"
envPort = "MF_EXPORT_PORT"

envMqttHost = "MF_EXPORT_MQTT_HOST"
envMqttUsername = "MF_EXPORT_MQTT_USERNAME"
Expand All @@ -73,6 +76,7 @@ const (
)

func main() {
ctx := context.Background()
cfg, err := loadConfigs()
if err != nil {
log.Fatalf(err.Error())
Expand All @@ -83,14 +87,13 @@ func main() {
log.Fatalf(err.Error())
}

nc, err := nats.Connect(cfg.Server.NatsURL)
pubsub, err := brokers.NewPubSub(cfg.Server.BrokerURL, "", logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Server.NatsURL))
os.Exit(1)
logger.Fatal(fmt.Sprintf("Failed to connect to Broker: %s %s", err, cfg.Server.BrokerURL))
}
defer nc.Close()
defer pubsub.Close()

svc, err := export.New(cfg, logger)
svc, err := export.New(cfg, logger, pubsub)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create service :%s", err))
return
Expand All @@ -99,14 +102,14 @@ func main() {
logger.Error(fmt.Sprintf("Failed to start service %s", err))
return
}
svc.Subscribe(nc)
svc.Subscribe(ctx)

// Publish heartbeat
ticker := time.NewTicker(10000 * time.Millisecond)
go func() {
subject := fmt.Sprintf("%s.%s.%s", heartbeatSubject, svcName, service)
for range ticker.C {
if err := nc.Publish(subject, []byte{}); err != nil {
if err := pubsub.Publish(ctx, subject, &messaging.Message{Channel: subject}); err != nil {
logger.Error(fmt.Sprintf("Failed to publish heartbeat, %s", err))
}
}
Expand Down Expand Up @@ -149,7 +152,7 @@ func loadConfigs() (exp.Config, error) {
QoS := int(q)

sc := exp.Server{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
BrokerURL: mainflux.Env(envBrokerURL, defBrokerURL),
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
Port: mainflux.Env(envPort, defPort),
CachePass: mainflux.Env(envCachePass, defCachePass),
Expand Down
2 changes: 1 addition & 1 deletion docker/.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Docker: Environment variables in Compose

### Export
MF_NATS_URL=nats://broker:4222
MF_BROKER_URL=nats://broker:4222
MF_EXPORT_PORT=8178
MF_EXPORT_LOG_LEVEL=debug
MF_EXPORT_CONFIG_FILE=config.toml
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
Expand Down
43 changes: 43 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/containerd/continuity v0.4.1 h1:wQnVrjIyQ8vhU2sgOiL5T07jo+ouqc2bnKsv5/EqGhU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/cli v24.0.2+incompatible h1:QdqR7znue1mtkXIJ+ruQMGQhpw2JzMJLRXp6zpzF6tM=
github.com/docker/docker v24.0.2+incompatible h1:eATx+oLz9WdNVkQrr0qjQ8HvRJ4bOOxfzEo8R+dA3cg=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
Expand All @@ -29,21 +40,28 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mainflux/mainflux v0.0.0-20230713105239-52131eba669c h1:W6b2x4ngTpEC40RqHHktciQd8dKbDZbqKnAz26jliNI=
github.com/mainflux/mainflux v0.0.0-20230713105239-52131eba669c/go.mod h1:FoeJ13mrfikrsFDW6bOb3C44D5gZ5m9Jt249G1sLKq0=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
Expand All @@ -59,9 +77,15 @@ github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
github.com/opencontainers/runc v1.1.7 h1:y2EZDS8sNng4Ksf0GUYNhKbTShZJPJg1FiXJNH/uoCk=
github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
Expand All @@ -70,11 +94,23 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand All @@ -85,6 +121,7 @@ golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -118,6 +155,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -132,4 +170,9 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type MQTT struct {
}

type Server struct {
NatsURL string `json:"nats" toml:"nats" mapstructure:"nats"`
BrokerURL string `json:"broker_url" toml:"broker_url" mapstructure:"broker_url"`
LogLevel string `json:"log_level" toml:"log_level" mapstructure:"log_level"`
Port string `json:"port" toml:"port" mapstructure:"port"`
CacheURL string `json:"cache_url" toml:"cache_url" mapstructure:"port"`
Expand Down
13 changes: 6 additions & 7 deletions pkg/export/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
nats "github.com/nats-io/nats.go"
)

const (
Expand All @@ -41,7 +40,7 @@ type Route struct {
NatsTopic string
MqttTopic string
Subtopic string
Messages chan *nats.Msg
Messages chan *messaging.Message
Workers int
Type string
logger logger.Logger
Expand All @@ -59,7 +58,7 @@ func NewRoute(rc config.Route, log logger.Logger, pub messages.Publisher) *Route
Subtopic: rc.SubTopic,
Type: rc.Type,
Workers: w,
Messages: make(chan *nats.Msg, w),
Messages: make(chan *messaging.Message, w),
logger: log,
pub: pub,
}
Expand All @@ -85,19 +84,19 @@ func (r *Route) Process(data []byte) ([]byte, error) {

func (r *Route) Consume() {
for msg := range r.Messages {
payload, err := r.Process(msg.Data)
payload, err := r.Process(msg.Payload)
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to consume message %s", err))
}
topic := r.MqttTopic
if r.Subtopic != "" {
topic = fmt.Sprintf("%s/%s", r.MqttTopic, r.Subtopic)
}
topic = fmt.Sprintf("%s/%s", topic, strings.ReplaceAll(msg.Subject, ".", "/"))
if err := r.pub.Publish(msg.Subject, topic, payload); err != nil {
topic = fmt.Sprintf("%s/%s", topic, strings.ReplaceAll(msg.Channel, ".", "/"))
if err := r.pub.Publish(msg.Channel, topic, payload); err != nil {
r.logger.Error(fmt.Sprintf("Failed to publish on route %s: %s", r.MqttTopic, err))
}
r.msgDebug(msg.Subject, payload)
r.msgDebug(msg.Channel, payload)
}
}

Expand Down
42 changes: 31 additions & 11 deletions pkg/export/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package export

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
Expand All @@ -15,14 +16,14 @@ import (
"github.com/mainflux/export/pkg/messages"
logger "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
nats "github.com/nats-io/nats.go"
"github.com/mainflux/mainflux/pkg/messaging"
)

var errNoCacheConfigured = errors.New("No cache configured")

type Exporter interface {
Start(queue string) errors.Error
Subscribe(nc *nats.Conn)
Subscribe(ctx context.Context)
Logger() logger.Logger
}
type Service interface {
Expand All @@ -38,21 +39,22 @@ type exporter struct {
cfg config.Config
consumers map[string]*Route
logger logger.Logger
pubsub messaging.PubSub
sync.RWMutex
}

const (
exportGroup = "export"
NatsSub = "export"
NatsAll = ">"
Channels = "channels"
Messages = "messages"
NatsSub = "export"
NatsAll = ">"
Channels = "channels"
Messages = "messages"
svcName = "export"
)

var errNoRoutesConfigured = errors.New("No routes configured")

// New create new instance of export service.
func New(c config.Config, l logger.Logger) (Service, error) {
func New(c config.Config, l logger.Logger, pubsub messaging.PubSub) (Service, error) {
routes := make(map[string]*Route)
id := fmt.Sprintf("export-%s", c.MQTT.Username)

Expand All @@ -61,6 +63,7 @@ func New(c config.Config, l logger.Logger) (Service, error) {
logger: l,
cfg: c,
consumers: routes,
pubsub: pubsub,
}
client, err := e.mqttConnect(c, l)
if err != nil {
Expand Down Expand Up @@ -104,10 +107,27 @@ func (e *exporter) newRoute(r config.Route) *Route {
return NewRoute(r, e.logger, e)
}

func (e *exporter) Subscribe(nc *nats.Conn) {
type handleFunc func(msg *messaging.Message) error

func (h handleFunc) Handle(msg *messaging.Message) error {
return h(msg)

}

func (h handleFunc) Cancel() error {
return nil
}

func handle(mesChan chan *messaging.Message) handleFunc {
return func(msg *messaging.Message) error {
mesChan <- msg
return nil
}
}

func (e *exporter) Subscribe(ctx context.Context) {
for _, r := range e.consumers {
_, err := nc.ChanQueueSubscribe(r.NatsTopic, exportGroup, r.Messages)
if err != nil {
if err := e.pubsub.Subscribe(ctx, svcName, r.NatsTopic, handle(r.Messages)); err != nil {
e.logger.Error(fmt.Sprintf("Failed to subscribe to NATS %s: %s", r.NatsTopic, err))
}
for i := 0; i < r.Workers; i++ {
Expand Down
Loading