Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: v0.2.0 #3

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Common flags:
--id string MQTT client ID (default "mqtt-test-bssJjZUs1vhTvf6KpTpTLw")
-q, --quiet Quiet mode, only print results
-s, --server stringArray MQTT endpoint as username:password@host:port (default [tcp://localhost:1883])
--timeout duration Timeout for the test (default 10s)
--version version for mqtt-test
-v, --very-verbose Very verbose, print everything we can
```
Expand All @@ -40,7 +41,7 @@ Flags:
--retain Mark each published message as retained
--size int Approximate size of each message (pub adds a timestamp)
--timestamp Prepend a timestamp to each message
--topic string Base topic (prefix) to publish into (/{n} will be added if --topics > 0) (default "mqtt-test/fIqfOq5Lg5wk636V4sLXoc")
--topic string Base topic (prefix) to publish into (/{n} will be added if --topics > 0)
--topics int Cycle through NTopics appending "/{n}"
```

Expand All @@ -56,19 +57,23 @@ Flags:
--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1)
--retained int Expect to receive this many retained messages
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Base topic for the test, will subscribe to {topic}/+
--timestamp Expect a timestamp in the payload and use it to calculate receive time
--topic string Topic to subscribe to
```

##### pubsub

Publishes N messages, and waits for all of them to be received by subscribers. Measures end-end delivery time on the messages. Used with `--num-subscribers` can run several concurrent subscriber connections.

```
--messages int Number of messages to publish and receive (default 1)
--qos int MQTT QOS
--size int Approximate size of each message (pub adds a timestamp)
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Topic to publish and subscribe to (default "mqtt-test/JPrbNU6U3IbVQLIyazkP4y")
--messages int Number of messages to publish and receive (default 1)
--mps int Publish mps messages per second; 0 means no delay (default 1000)
--pub-server string Server to publish to. Defaults to the first server in --servers
--qos int MQTT QOS
--size int Message extra payload size (in addition to the JSON timestamp)
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Topic (or base topic if --topics > 1)
--topics int Number of topics to use, If more than one will add /1, /2, ... to --topic when publishing, and subscribe to topic/+ (default 1)
```

##### subret
Expand All @@ -78,10 +83,12 @@ topics N times. Measures time to SUBACK and to all retained messages received.
Used with `--subscribers` can run several concurrent subscriber connections.

```
--qos int MQTT QOS
--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1)
--size int Approximate size of each message (pub adds a timestamp)
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Base topic (prefix) for the test (default "mqtt-test/yNkmAFnFHETSGnQJNjwGdN")
--topics int Number of sub-topics to publish retained messages to (default 1)
--mps int Publish mps messages per second; 0 means no delay (default 1000)
--pub-server stringArray Server(s) to publish to. Defaults to --servers
--qos int MQTT QOS for subscriptions. Messages are published as QOS1.
--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1)
--retained int Number of retained messages to publish and receive (default 1)
--size int Message payload size
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string base topic (if --retaned > 1 will be published to topic/1, topic/2, ...)
```
68 changes: 17 additions & 51 deletions command-pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@
package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)

type pubCommand struct {
publisher
opts publisher
publishers int
timestamp bool
}

func newPubCommand() *cobra.Command {
Expand All @@ -39,59 +34,30 @@ func newPubCommand() *cobra.Command {
Args: cobra.NoArgs,
}

// Message options
cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Base topic (prefix) to publish into (/{n} will be added if --topics > 0)")
cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")
cmd.Flags().BoolVar(&c.retain, "retain", false, "Mark each published message as retained")
cmd.Flags().BoolVar(&c.timestamp, "timestamp", false, "Prepend a timestamp to each message")

// Test options
cmd.Flags().IntVar(&c.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of transactions to run, see the specific command")
cmd.Flags().IntVar(&c.opts.messages, "messages", 1, "Number of transactions to run, see the specific command")
cmd.Flags().IntVar(&c.opts.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.opts.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().BoolVar(&c.opts.retain, "retain", false, "Mark each published message as retained")
cmd.Flags().IntVar(&c.opts.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")
cmd.Flags().BoolVar(&c.opts.timestamp, "timestamp", false, "Prepend a timestamp to each message")
cmd.Flags().StringVar(&c.opts.topic, "topic", defaultTopic(), "Base topic (prefix) to publish into (/{n} will be added if --topics > 0)")
cmd.Flags().IntVar(&c.opts.topics, "topics", 0, `Cycle through NTopics appending "/{n}"`)
cmd.Flags().IntVar(&c.publishers, "publishers", 1, `Number of publishers to run concurrently, at --mps each`)
cmd.Flags().IntVar(&c.topics, "topics", 0, `Cycle through NTopics appending "/{n}"`)

return cmd
}

func (c *pubCommand) run(_ *cobra.Command, _ []string) {
msgChan := make(chan *Stat)
errChan := make(chan error)

doneCh := make(chan struct{})
for i := 0; i < c.publishers; i++ {
p := c.publisher // copy
p.clientID = ClientID + "-" + strconv.Itoa(i)
go p.publish(msgChan, errChan, c.timestamp)
}

pubOps := 0
pubNS := time.Duration(0)
pubBytes := int64(0)
timeout := time.NewTimer(Timeout)
defer timeout.Stop()

// get back 1 report per publisher
for n := 0; n < c.publishers; {
select {
case stat := <-msgChan:
pubOps += stat.Ops
pubNS += stat.NS["pub"]
pubBytes += stat.Bytes
n++

case err := <-errChan:
log.Fatalf("Error: %v", err)

case <-timeout.C:
log.Fatalf("Error: timeout waiting for publishers")
p := c.opts // copy
p.dials = dials(Servers)
p.clientID = ClientID
if c.publishers > 1 {
p.clientID = p.clientID + "-" + strconv.Itoa(i)
}
go p.publish(doneCh)
}

bb, _ := json.Marshal(Stat{
Ops: pubOps,
NS: map[string]time.Duration{"pub": pubNS},
Bytes: pubBytes,
})
os.Stdout.Write(bb)
waitN(doneCh, c.publishers, "publisher to finish")
}
128 changes: 56 additions & 72 deletions command-pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,107 +14,91 @@
package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)

type pubsubCommand struct {
messageOpts

messages int
pubOpts publisher
subOpts receiver
subscribers int
pubServer string
}

func newPubSubCommand() *cobra.Command {
c := &pubsubCommand{}

cmd := &cobra.Command{
Use: "pubsub [--flags...]",
Short: "Subscribe and receive N published messages",
Run: c.run,
Args: cobra.NoArgs,
}

// Message options
cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of messages to publish and receive")
cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Topic to publish and subscribe to")
cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")

// Test options
cmd.Flags().IntVar(&c.pubOpts.messages, "messages", 1, "Number of messages to publish and receive")
cmd.Flags().IntVar(&c.pubOpts.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.pubOpts.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.pubOpts.size, "size", 0, "Message extra payload size (in addition to the JSON timestamp)")
cmd.Flags().StringVar(&c.pubOpts.topic, "topic", defaultTopic(), "Topic (or base topic if --topics > 1)")
cmd.Flags().IntVar(&c.pubOpts.topics, "topics", 1, "Number of topics to use, If more than one will add /1, /2, ... to --topic when publishing, and subscribe to topic/+")
cmd.Flags().StringVar(&c.pubServer, "pub-server", "", "Server to publish to. Defaults to the first server in --servers")
cmd.Flags().IntVar(&c.subscribers, "subscribers", 1, `Number of subscribers to run concurrently`)

cmd.PreRun = func(_ *cobra.Command, _ []string) {
c.pubOpts.clientID = ClientID + "-pub"
c.pubOpts.timestamp = true
s := c.pubServer
if s == "" {
s = Servers[0]
}
c.pubOpts.dials = []dial{dial(s)}

c.subOpts.clientID = ClientID + "-sub"
c.subOpts.expectPublished = c.pubOpts.messages
c.subOpts.expectTimestamp = true
c.subOpts.filterPrefix = c.pubOpts.topic
c.subOpts.qos = c.pubOpts.qos
c.subOpts.repeat = 1
c.subOpts.topic = c.pubOpts.topic
if c.pubOpts.topics > 1 {
c.subOpts.topic = c.pubOpts.topic + "/+"
}
}

return cmd
}

func (c *pubsubCommand) run(_ *cobra.Command, _ []string) {
clientID := ClientID + "-sub"
readyCh := make(chan struct{})
errCh := make(chan error)
statsCh := make(chan *Stat)
doneCh := make(chan struct{})

// Connect all subscribers (and subscribe)
for i := 0; i < c.subscribers; i++ {
r := &receiver{
clientID: clientID + "-" + strconv.Itoa(i),
topic: c.topic,
qos: c.qos,
expectPublished: c.messages,
repeat: 1,
}
go r.receive(readyCh, statsCh, errCh)
counter := 0
if len(Servers) > 1 || c.subscribers > 1 {
counter = 1
}

// Wait for all subscriptions to signal ready
cSub := 0
timeout := time.NewTimer(Timeout)
defer timeout.Stop()
for cSub < c.subscribers {
select {
case <-readyCh:
cSub++
case err := <-errCh:
log.Fatal(err)
case <-timeout.C:
log.Fatalf("timeout waiting for subscribers to be ready")
}
}

// ready to receive, start publishing. The publisher will exit when done, no need to wait for it.
p := &publisher{
clientID: ClientID + "-pub",
messageOpts: c.messageOpts,
messages: c.messages,
mps: 1000,
}
go p.publish(nil, errCh, true)

// wait for the stats
total := Stat{
NS: make(map[string]time.Duration),
}
timeout = time.NewTimer(Timeout)
defer timeout.Stop()
for i := 0; i < c.subscribers; i++ {
select {
case stat := <-statsCh:
total.Ops += stat.Ops
total.Bytes += stat.Bytes
for k, v := range stat.NS {
total.NS[k] += v
N := c.subscribers * len(Servers)

// Connect all subscribers and subscribe. Wait for all subscriptions to
// signal ready before publishing.
for _, d := range dials(Servers) {
for i := 0; i < c.subscribers; i++ {
r := c.subOpts // copy
if r.clientID == "" {
r.clientID = ClientID
}
case err := <-errCh:
log.Fatalf("Error: %v", err)
case <-timeout.C:
log.Fatalf("Error: timeout waiting for messages")
if counter != 0 {
r.clientID = r.clientID + "-" + strconv.Itoa(counter)
counter++
}
r.dial = d
go r.receive(readyCh, doneCh)
}
}
waitN(readyCh, N, "subscribers to be ready")

// ready to receive, start publishing. Give the publisher the same done
// channel, will wait for one more.
go c.pubOpts.publish(doneCh)

bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
waitN(doneCh, N+1, "publisher and all subscribers to finish")
}
Loading
Loading