Skip to content

Commit

Permalink
wip v0.2.0 candidate
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Feb 26, 2024
1 parent b4ee55a commit b03919d
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 446 deletions.
68 changes: 17 additions & 51 deletions command-pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@
package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)

type pubCommand struct {
publisher
opts publisher
publishers int
timestamp bool
}

func newPubCommand() *cobra.Command {
Expand All @@ -39,59 +34,30 @@ func newPubCommand() *cobra.Command {
Args: cobra.NoArgs,
}

// Message options
cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Base topic (prefix) to publish into (/{n} will be added if --topics > 0)")
cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")
cmd.Flags().BoolVar(&c.retain, "retain", false, "Mark each published message as retained")
cmd.Flags().BoolVar(&c.timestamp, "timestamp", false, "Prepend a timestamp to each message")

// Test options
cmd.Flags().IntVar(&c.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of transactions to run, see the specific command")
cmd.Flags().IntVar(&c.opts.messages, "messages", 1, "Number of transactions to run, see the specific command")
cmd.Flags().IntVar(&c.opts.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.opts.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().BoolVar(&c.opts.retain, "retain", false, "Mark each published message as retained")
cmd.Flags().IntVar(&c.opts.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")
cmd.Flags().BoolVar(&c.opts.timestamp, "timestamp", false, "Prepend a timestamp to each message")
cmd.Flags().StringVar(&c.opts.topic, "topic", defaultTopic(), "Base topic (prefix) to publish into (/{n} will be added if --topics > 0)")
cmd.Flags().IntVar(&c.opts.topics, "topics", 0, `Cycle through NTopics appending "/{n}"`)
cmd.Flags().IntVar(&c.publishers, "publishers", 1, `Number of publishers to run concurrently, at --mps each`)
cmd.Flags().IntVar(&c.topics, "topics", 0, `Cycle through NTopics appending "/{n}"`)

return cmd
}

func (c *pubCommand) run(_ *cobra.Command, _ []string) {
msgChan := make(chan *Stat)
errChan := make(chan error)

doneCh := make(chan struct{})
for i := 0; i < c.publishers; i++ {
p := c.publisher // copy
p.clientID = ClientID + "-" + strconv.Itoa(i)
go p.publish(msgChan, errChan)
}

pubOps := 0
pubNS := time.Duration(0)
pubBytes := int64(0)
timeout := time.NewTimer(Timeout)
defer timeout.Stop()

// get back 1 report per publisher
for n := 0; n < c.publishers; {
select {
case stat := <-msgChan:
pubOps += stat.Ops
pubNS += stat.NS["pub"]
pubBytes += stat.Bytes
n++

case err := <-errChan:
log.Fatalf("Error: %v", err)

case <-timeout.C:
log.Fatalf("Error: timeout waiting for publishers")
p := c.opts // copy
p.dials = dials(Servers)
p.clientID = ClientID
if c.publishers > 1 {
p.clientID = p.clientID + "-" + strconv.Itoa(i)
}
go p.publish(doneCh)
}

bb, _ := json.Marshal(Stat{
Ops: pubOps,
NS: map[string]time.Duration{"pub": pubNS},
Bytes: pubBytes,
})
os.Stdout.Write(bb)
waitN(doneCh, c.publishers, "publisher to finish")
}
129 changes: 56 additions & 73 deletions command-pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,108 +14,91 @@
package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)

type pubsubCommand struct {
messageOpts

messages int
pubOpts publisher
subOpts receiver
subscribers int
pubServer string
}

func newPubSubCommand() *cobra.Command {
c := &pubsubCommand{}

cmd := &cobra.Command{
Use: "pubsub [--flags...]",
Short: "Subscribe and receive N published messages",
Run: c.run,
Args: cobra.NoArgs,
}

// Message options
cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of messages to publish and receive")
cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Topic to publish and subscribe to")
cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")

// Test options
cmd.Flags().IntVar(&c.pubOpts.messages, "messages", 1, "Number of messages to publish and receive")
cmd.Flags().IntVar(&c.pubOpts.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.pubOpts.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.pubOpts.size, "size", 0, "Message extra payload size (in addition to the JSON timestamp)")
cmd.Flags().StringVar(&c.pubOpts.topic, "topic", defaultTopic(), "Topic (or base topic if --topics > 1)")
cmd.Flags().IntVar(&c.pubOpts.topics, "topics", 1, "Number of topics to use, If more than one will add /1, /2, ... to --topic when publishing, and subscribe to topic/+")
cmd.Flags().StringVar(&c.pubServer, "pub-server", "", "Server to publish to. Defaults to the first server in --servers")
cmd.Flags().IntVar(&c.subscribers, "subscribers", 1, `Number of subscribers to run concurrently`)

cmd.PreRun = func(_ *cobra.Command, _ []string) {
c.pubOpts.clientID = ClientID + "-pub"
c.pubOpts.timestamp = true
s := c.pubServer
if s == "" {
s = Servers[0]
}
c.pubOpts.dials = []dial{dial(s)}

c.subOpts.clientID = ClientID + "-sub"
c.subOpts.expectPublished = c.pubOpts.messages
c.subOpts.expectTimestamp = true
c.subOpts.filterPrefix = c.pubOpts.topic
c.subOpts.qos = c.pubOpts.qos
c.subOpts.repeat = 1
c.subOpts.topic = c.pubOpts.topic
if c.pubOpts.topics > 1 {
c.subOpts.topic = c.pubOpts.topic + "/+"
}
}

return cmd
}

func (c *pubsubCommand) run(_ *cobra.Command, _ []string) {
clientID := ClientID + "-sub"
readyCh := make(chan struct{})
errCh := make(chan error)
statsCh := make(chan *Stat)
doneCh := make(chan struct{})

// Connect all subscribers (and subscribe)
for i := 0; i < c.subscribers; i++ {
r := &receiver{
clientID: clientID + "-" + strconv.Itoa(i),
topic: c.topic,
qos: c.qos,
expectPublished: c.messages,
repeat: 1,
}
go r.receive(readyCh, statsCh, errCh)
counter := 0
if len(Servers) > 1 || c.subscribers > 1 {
counter = 1
}

// Wait for all subscriptions to signal ready
cSub := 0
timeout := time.NewTimer(Timeout)
defer timeout.Stop()
for cSub < c.subscribers {
select {
case <-readyCh:
cSub++
case err := <-errCh:
log.Fatal(err)
case <-timeout.C:
log.Fatalf("timeout waiting for subscribers to be ready")
}
}

// ready to receive, start publishing. The publisher will exit when done, no need to wait for it.
p := &publisher{
clientID: ClientID + "-pub",
messageOpts: c.messageOpts,
messages: c.messages,
mps: 1000,
timestamp: true,
}
go p.publish(nil, errCh)

// wait for the stats
total := Stat{
NS: make(map[string]time.Duration),
}
timeout = time.NewTimer(Timeout)
defer timeout.Stop()
for i := 0; i < c.subscribers; i++ {
select {
case stat := <-statsCh:
total.Ops += stat.Ops
total.Bytes += stat.Bytes
for k, v := range stat.NS {
total.NS[k] += v
N := c.subscribers * len(Servers)

// Connect all subscribers and subscribe. Wait for all subscriptions to
// signal ready before publishing.
for _, d := range dials(Servers) {
for i := 0; i < c.subscribers; i++ {
r := c.subOpts // copy
if r.clientID == "" {
r.clientID = ClientID
}
case err := <-errCh:
log.Fatalf("Error: %v", err)
case <-timeout.C:
log.Fatalf("Error: timeout waiting for messages")
if counter != 0 {
r.clientID = r.clientID + "-" + strconv.Itoa(counter)
counter++
}
r.dial = d
go r.receive(readyCh, doneCh)
}
}
waitN(readyCh, N, "subscribers to be ready")

// ready to receive, start publishing. Give the publisher the same done
// channel, will wait for one more.
go c.pubOpts.publish(doneCh)

bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
waitN(doneCh, N+1, "publisher and all subscribers to finish")
}
Loading

0 comments on commit b03919d

Please sign in to comment.