Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSmallBoat committed May 26, 2020
1 parent 80e4745 commit 5ea8cdc
Show file tree
Hide file tree
Showing 7 changed files with 1,043 additions and 0 deletions.
95 changes: 95 additions & 0 deletions common/conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package common

import (
"fmt"
)

type Config struct {
General GeneralConf `gcfg:"general"`
MqttTopic MqttTopicConf `gcfg:"mqtt-topic"`
MqttPublisher MqttPublisherConf `gcfg:"mqtt-publisher"`
MqttSubscriber MqttSubscriberConf `gcfg:"mqtt-subscriber"`
}

type GeneralConf struct {
Debug bool
Brokername string
}

type MqttTopicConf struct {
Topicroot string
Numofeachlevel uint16 //the number of topics for each level of the pools.
}

type MqttPublisherConf struct {
Scheme string
Hostname string
Port uint
Cleansession bool
Qos uint8
Pingtimeout uint8
Keepalive uint16
Username string
Password string
Prefixname string
Prefixshort string
Publisherseachtopic uint8
Messageseachpublisher uint16
Enablestaticmessage bool
Intervalmessagesgeneration uint16
}

type MqttSubscriberConf struct {
Scheme string
Hostname string
Port uint
Cleansession bool
Qos uint8
Pingtimeout uint8
Keepalive uint16
Username string
Password string
Prefixname string
Prefixshort string
Subscribereachtopic uint8
}

type CalculateTargetData struct {
Topics int
Publishers int
PublishQos int
MessagesForPublish int
Subscribers int
SubscribeQos int
MessagesForSubscribe int
}

func (cfg *Config) CalculateData() *CalculateTargetData {
numTopics := int(cfg.MqttTopic.Numofeachlevel * 3)
numPublishers := int(cfg.MqttPublisher.Publisherseachtopic) * numTopics
numMessagesForPublish := int(cfg.MqttPublisher.Messageseachpublisher) * numPublishers
numSubscribers := int(cfg.MqttSubscriber.Subscribereachtopic) * numTopics
numMessagesForSubscribe := int(cfg.MqttPublisher.Messageseachpublisher) * int(cfg.MqttPublisher.Publisherseachtopic) * numSubscribers

return &CalculateTargetData{
numTopics,
numPublishers,
int(cfg.MqttPublisher.Qos),
numMessagesForPublish,
numSubscribers,
int(cfg.MqttSubscriber.Qos),
numMessagesForSubscribe,
}
}

func (cfg *Config) GetConfInfo() string {
info := fmt.Sprintf("Configuration information ... \n[general] => %+v, [mqtt-topic] => %+v, [mqtt-publisher] => %+v, [mqtt-subscriber] => %+v \n ", cfg.General, cfg.MqttTopic, cfg.MqttPublisher, cfg.MqttSubscriber)
return info
}

func (cfg *Config) CalculateInfo() string {
ctd := cfg.CalculateData()
line := "-------------------------------------------------------------------------------------------------------------------"
info := fmt.Sprintf("Calculated data based on configuration information will be ... \n %s \n [Topics: %s], [publishers (Qos:%d): %s -> messages: %s], [subscribers (Qos:%d): %s <- messages: %s] \n %s \n ", line, FormatCommas(ctd.Topics), ctd.PublishQos, FormatCommas(ctd.Publishers), FormatCommas(ctd.MessagesForPublish), ctd.SubscribeQos, FormatCommas(ctd.Subscribers), FormatCommas(ctd.MessagesForSubscribe), line)
return info
}
45 changes: 45 additions & 0 deletions common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package common

import (
"fmt"
"os"
"runtime"
"strings"

log "github.com/Sirupsen/logrus"
gcfg "gopkg.in/gcfg.v1"
)

func UserHomeDir() string {
if runtime.GOOS == "windows" {
home := os.Getenv("HOMEDRIVE") + os.Getenv("HOMEPATH")
if home == "" {
home = os.Getenv("USERPROFILE")
}
return home
}
return os.Getenv("HOME")
}

func LoadConf(path string) (*Config, error) {
home := UserHomeDir()
path = strings.Replace(path, "~", home, 1)
log.Info(fmt.Sprintf("Loading configuration information from '%s' ", path))

var cfg Config
err := gcfg.ReadFileInto(&cfg, path)
if err != nil {
return nil, err
}
log.Info(cfg.GetConfInfo())

if cfg.General.Debug {
log.SetLevel(log.DebugLevel)
log.Debug("The debug mode ... [ENABLE]")
} else {
log.SetLevel(log.InfoLevel)
}

log.Info(cfg.CalculateInfo())
return &cfg, nil
}
37 changes: 37 additions & 0 deletions common/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package common

import (
"fmt"
"regexp"
)

/*
import "strconv"
func FormatCommas(n int64) string {
in := strconv.FormatInt(n, 10)
out := make([]byte, len(in)+(len(in)-2+int(in[0]/'0'))/3)
if in[0] == '-' {
in, out[0] = in[1:], '-'
}
for i, j, k := len(in)-1, len(out)-1, 0; ; i, j = i-1, j-1 {
out[j] = in[i]
if i == 0 {
return string(out)
}
if k++; k == 3 {
j, k = j-1, 0
out[j] = ','
}
}
}*/

func FormatCommas(num int) string {
str := fmt.Sprintf("%d", num)
re := regexp.MustCompile("(\\d+)(\\d{3})")
for n := ""; n != str; {
n = str
str = re.ReplaceAllString(str, "$1,$2")
}
return str
}
152 changes: 152 additions & 0 deletions common/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package common

import (
"crypto/rand"
"fmt"
"sync/atomic"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
)

const maxClientIdRandomSectionLength = 6

type MqttClientUnit struct {
MqttClient MQTT.Client
Opts *MQTT.ClientOptions
Topic string
Qos uint8

WML *WorkerMetricsWithLevel
SInfo *SubscriberInfoOnReceivedMessage
}

func newPublisherMqttOptions(mpc *MqttPublisherConf) *MQTT.ClientOptions {
var brokerUri = fmt.Sprintf("%s://%s:%d", mpc.Scheme, mpc.Hostname, mpc.Port)
return initMqttOptions(brokerUri, mpc.Username, mpc.Password, mpc.Cleansession, mpc.Pingtimeout, mpc.Keepalive)
}
func newSubscriberMqttOptions(msc *MqttSubscriberConf) *MQTT.ClientOptions {
var brokerUri = fmt.Sprintf("%s://%s:%d", msc.Scheme, msc.Hostname, msc.Port)
return initMqttOptions(brokerUri, msc.Username, msc.Password, msc.Cleansession, msc.Pingtimeout, msc.Keepalive)
}

func initMqttOptions(brokerUri string, username string, password string, cleansession bool, pingtimeout uint8, keepalive uint16) *MQTT.ClientOptions {
opts := MQTT.NewClientOptions()

opts.SetAutoReconnect(true)
opts.SetCleanSession(cleansession)
opts.SetPingTimeout(time.Duration(pingtimeout) * time.Second)
opts.SetConnectTimeout(time.Duration(keepalive) * time.Second)

opts.AddBroker(brokerUri)
if username != "" {
opts.SetUsername(username)
}
if password != "" {
opts.SetPassword(password)
}

return opts
}

// getRandomClientId returns randomized ClientId.
func getRandomClientId(prefix string) string {
const alphaNum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
var bytes = make([]byte, maxClientIdRandomSectionLength)
_, _ = rand.Read(bytes)
for i, b := range bytes {
bytes[i] = alphaNum[b%byte(len(alphaNum))]
}
return prefix + "-" + string(bytes)
}

// with Connects connect to the MQTT broker with Options.
func NewPublisherMqttClientUnit(mpc *MqttPublisherConf, mtc *MqttTopicConf, wml *WorkerMetricsWithLevel, level string, topicid uint16) (*MqttClientUnit, error) {
atomic.AddUint32(&wml.numOfPublishers, 1)
clientId := getRandomClientId(mpc.Prefixshort)
pubTopic := fmt.Sprintf("%s/%s/%d", level, mtc.Topicroot, topicid)

opts := newPublisherMqttOptions(mpc)
opts.SetClientID(clientId)
mcu := &MqttClientUnit{Opts: opts, Topic: pubTopic, Qos: mpc.Qos, WML: wml}
err := mcu.setMqttClientHandler(mcu.PublisherOnConnect, mcu.PublisherConnectionLost)
if err != nil {
atomic.AddUint32(&wml.numOfPublisherOnConnectErrors, 1)
}
atomic.AddUint32(&wml.numOfPublishersDone, 1)
return mcu, err
}

func NewSubscriberMqttClientUnit(si *SubscriberInfoOnReceivedMessage, msc *MqttSubscriberConf, mtc *MqttTopicConf, wml *WorkerMetricsWithLevel, level string, topicid uint16) (*MqttClientUnit, error) {
atomic.AddUint32(&wml.numOfSubscribers, 1)
clientId := getRandomClientId(msc.Prefixshort)
subTopic := fmt.Sprintf("%s/%s/%d", level, mtc.Topicroot, topicid)

opts := newSubscriberMqttOptions(msc)
opts.SetClientID(clientId)
mcu := &MqttClientUnit{Opts: opts, Topic: subTopic, Qos: msc.Qos, WML: wml, SInfo: si}
err := mcu.setMqttClientHandler(mcu.SubscriberOnConnect, mcu.SubscriberConnectionLost)
if err != nil {
atomic.AddUint32(&wml.numOfSubscriberOnConnectErrors, 1)
}
atomic.AddUint32(&wml.numOfSubscribersDone, 1)
return mcu, err
}

func (m *MqttClientUnit) setMqttClientHandler(onConn MQTT.OnConnectHandler, conLostHandler MQTT.ConnectionLostHandler) error {
m.Opts.SetOnConnectHandler(onConn)
m.Opts.SetConnectionLostHandler(conLostHandler)

client := MQTT.NewClient(m.Opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
m.MqttClient = client
return nil
}

func (m *MqttClientUnit) PublisherOnConnect(client MQTT.Client) {
atomic.AddUint32(&m.WML.numOfPublisherOnConnect, 1)
}

func (m *MqttClientUnit) PublisherConnectionLost(client MQTT.Client, reason error) {
atomic.AddUint32(&m.WML.numOfPublisherConnectionLost, 1)
}

func (m *MqttClientUnit) onMessageReceived(client MQTT.Client, msg MQTT.Message) {
atomic.AddUint32(&m.WML.numOfSuccessfulMessagesSubscribed, 1)
atomic.AddUint32(&m.SInfo.num, 1)
//atomic.AddUint64(&m.SInfo.byte,uint64(len(msg.Payload())))

m.SInfo.t1 = time.Now()
if m.SInfo.t0.IsZero() {
m.SInfo.t0 = m.SInfo.t1
}
}

func (m *MqttClientUnit) SubscriberOnConnect(client MQTT.Client) {
atomic.AddUint32(&m.WML.numOfSubscriberOnConnect, 1)
if token := client.Subscribe(m.Topic, byte(m.Qos), m.onMessageReceived); token.Wait() && token.Error() != nil {
atomic.AddUint32(&m.WML.numOfSubscriberOnConnectSubErrors, 1)
return
}
}

func (m *MqttClientUnit) SubscriberConnectionLost(client MQTT.Client, reason error) {
atomic.AddUint32(&m.WML.numOfSubscriberConnectionLost, 1)
}

func (m *MqttClientUnit) Disconnect(counter *uint32) {
if m.MqttClient.IsConnected() {
m.MqttClient.Disconnect(250)
atomic.AddUint32(counter, 1)
}
}

func (m *MqttClientUnit) SubscriberExit() {
if token := m.MqttClient.Unsubscribe(m.Topic); token.Wait() && token.Error() != nil {
atomic.AddUint32(&m.WML.numOfFailedSubscriberOnUnsubscribe, 1)
}
atomic.AddUint32(&m.WML.numOfSuccessfulSubscriberOnUnsubscribe, 1)
m.Disconnect(&m.WML.numOfSubscriberOnDisConnect)
}
Loading

0 comments on commit 5ea8cdc

Please sign in to comment.