Skip to content

Commit

Permalink
Added --timestamp to pub and sub
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Feb 21, 2024
1 parent 54ed0c1 commit b4ee55a
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 19 deletions.
2 changes: 1 addition & 1 deletion command-pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *pubCommand) run(_ *cobra.Command, _ []string) {
for i := 0; i < c.publishers; i++ {
p := c.publisher // copy
p.clientID = ClientID + "-" + strconv.Itoa(i)
go p.publish(msgChan, errChan, c.timestamp)
go p.publish(msgChan, errChan)
}

pubOps := 0
Expand Down
3 changes: 2 additions & 1 deletion command-pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ func (c *pubsubCommand) run(_ *cobra.Command, _ []string) {
messageOpts: c.messageOpts,
messages: c.messages,
mps: 1000,
timestamp: true,
}
go p.publish(nil, errCh, true)
go p.publish(nil, errCh)

// wait for the stats
total := Stat{
Expand Down
10 changes: 8 additions & 2 deletions command-sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type subCommand struct {
subscribers int
expectRetained int
expectPublished int
expectTimestamp bool
}

func newSubCommand() *cobra.Command {
Expand All @@ -50,12 +51,13 @@ func newSubCommand() *cobra.Command {
cmd.Flags().IntVar(&c.subscribers, "subscribers", 1, `Number of subscribers to run concurrently`)
cmd.Flags().IntVar(&c.expectRetained, "retained", 0, `Expect to receive this many retained messages`)
cmd.Flags().IntVar(&c.expectPublished, "messages", 0, `Expect to receive this many published messages`)
cmd.Flags().BoolVar(&c.expectTimestamp, "timestamp", false, "Expect a timestamp in the payload and use it to calculate receive time")

return cmd
}

func (c *subCommand) run(_ *cobra.Command, _ []string) {
total := runSubPrepublishRetained(c.subscribers, c.repeat, c.expectRetained, c.expectPublished, c.messageOpts, false)
total := runSubPrepublishRetained(c.subscribers, c.repeat, c.expectRetained, c.expectPublished, c.messageOpts, c.expectTimestamp, false)
bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
}
Expand All @@ -66,6 +68,7 @@ func runSubPrepublishRetained(
expectRetained,
expectPublished int,
messageOpts messageOpts,
expectTimestamp bool,
prepublishRetained bool,
) *Stat {
errCh := make(chan error)
Expand All @@ -88,6 +91,7 @@ func runSubPrepublishRetained(
expectRetained: 0,
expectPublished: expectRetained,
repeat: 1,
expectTimestamp: false, // this is for retained messages we pre-publish, no timestamp there
}
go r.receive(receiverReadyCh, statsCh, errCh)
<-receiverReadyCh
Expand All @@ -98,9 +102,10 @@ func runSubPrepublishRetained(
messages: expectRetained,
topics: expectRetained,
messageOpts: messageOpts,
timestamp: false,
}
p.messageOpts.retain = true
go p.publish(nil, errCh, true)
go p.publish(nil, errCh)

// wait for the initial subscription to have received all messages
timeout := time.NewTimer(Timeout)
Expand All @@ -127,6 +132,7 @@ func runSubPrepublishRetained(
expectRetained: expectRetained,
expectPublished: expectPublished,
repeat: repeat,
expectTimestamp: expectTimestamp,
}
go r.receive(nil, statsCh, errCh)
}
Expand Down
2 changes: 1 addition & 1 deletion command-subret.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newSubRetCommand() *cobra.Command {
}

func (c *subretCommand) run(_ *cobra.Command, _ []string) {
total := runSubPrepublishRetained(c.subscribers, c.repeat, c.messages, 0, c.messageOpts, true)
total := runSubPrepublishRetained(c.subscribers, c.repeat, c.messages, 0, c.messageOpts, false, true)
bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
}
13 changes: 7 additions & 6 deletions publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ type messageOpts struct {
type publisher struct {
messageOpts

mps int
messages int
topics int
clientID string
mps int
messages int
topics int
clientID string
timestamp bool
}

func (p *publisher) publish(msgCh chan *Stat, errorCh chan error, timestamp bool) {
func (p *publisher) publish(msgCh chan *Stat, errorCh chan error) {
cl, _, cleanup, err := connect(p.clientID, CleanSession)
if err != nil {
log.Fatal(err)
Expand All @@ -61,7 +62,7 @@ func (p *publisher) publish(msgCh chan *Stat, errorCh chan error, timestamp bool
// is always terminated with a '-', which can not be part of the random
// fill. payload is then filled to the requested size with random data.
payload := randomPayload(p.size)
if timestamp {
if p.timestamp {
structuredPayload, _ := json.Marshal(PubValue{
Seq: n,
Timestamp: time.Now().UnixNano(),
Expand Down
20 changes: 12 additions & 8 deletions receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type receiver struct {
expectRetained int // expect to receive this many retained messages.
expectPublished int // expect to receive this many published messages.
repeat int // Number of times to repeat subscribe/receive/unsubscribe.
expectTimestamp bool // Expect a timestamp in the payload.

cRetained atomic.Int32 // Count of retained messages received.
cPublished atomic.Int32 // Count of published messages received.
Expand Down Expand Up @@ -128,15 +129,18 @@ func (r *receiver) msgHandler(client paho.Client, msg paho.Message) {
return
}

v := PubValue{}
body := msg.Payload()
if i := bytes.IndexByte(body, '\n'); i != -1 {
body = body[:i]
}
if err := json.Unmarshal(body, &v); err != nil {
log.Fatalf("Error parsing message JSON: %v", err)
elapsed := time.Since(r.start)
if r.expectTimestamp {
v := PubValue{}
body := msg.Payload()
if i := bytes.IndexByte(body, '\n'); i != -1 {
body = body[:i]
}
if err := json.Unmarshal(body, &v); err != nil {
log.Fatalf("Error parsing message JSON: %v", err)
}
elapsed = time.Since(time.Unix(0, v.Timestamp))
}
elapsed := time.Since(time.Unix(0, v.Timestamp))
logOp(clientID, "RPUB ->", elapsed, "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), msg.Qos())

dur := r.durPublished.Add(int64(elapsed))
Expand Down

0 comments on commit b4ee55a

Please sign in to comment.