Skip to content

Commit

Permalink
Optimize message routing using arn.Tree (#1)
Browse files Browse the repository at this point in the history
* Update server tests

* Initial tree implementation

* Add grid test

* Use window pool

* wip

* Refactor

* wip mix

* Add Fmux.Route

* Benchmark

* Refactor tree

* Refactor tree

* Rename func Handle -> AddFilter

* AddFilter and parts

* Add Tree.Match

* Add BenchmarkTree_Match

* Refactor

* Add Tree.Filters

* Use golden

* Match takes slice ref

* Remove method Tree.leafs

* Complete tests

* Clean

* Name package ftree

* Node carries any value

* Add method Tree.Find

* Rename package arn

* Tree.AddFilter returns node

* Remove setting value in node

* Server uses arn.Tree for routing

* Add method Tree.Leafs

* Use sync.RWMutex

* Document arn.Tree

* Test

* Add Example_tree

* Simplify topicFilter

* Remove type topicFilter

* Update changelog
  • Loading branch information
gregoryv authored Oct 15, 2023
1 parent 41d04bd commit a166501
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 141 deletions.
32 changes: 32 additions & 0 deletions arn/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package arn_test

import (
"fmt"

"github.com/gregoryv/tt/arn"
)

func Example_tree() {
x := arn.NewTree()
x.AddFilter("") // noop
x.AddFilter("#")
x.AddFilter("#")
x.AddFilter("#")
x.AddFilter("+/tennis/#")
x.AddFilter("sport/#")
x.AddFilter("sport/tennis/player1/#")
fmt.Println("filters:", x.Filters())

var result []*arn.Node
topic := "sport/golf"
x.Match(&result, topic)
fmt.Println("topic:", topic)
for _, n := range result {
fmt.Println(n.Filter())
}
// output:
// filters: [# +/tennis/# sport/# sport/tennis/player1/#]
// topic: sport/golf
// #
// sport/#
}
88 changes: 88 additions & 0 deletions arn/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package arn

// NewNode returns a new node using txt as level value. E.g. +, # or a
// word.
func NewNode(txt string) *Node {
return &Node{
txt: txt,
}
}

type Node struct {
// Value is controlled by the caller.
Value any

txt string
parent *Node
children []*Node
}

func (n *Node) match(result *[]*Node, parts []string, i int) {
switch {
case i > len(parts)-1:
*result = append(*result, n)
return

case n.txt == "#":
if parts[0][0] != '$' {
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901246
*result = append(*result, n)
}
return

case n.txt != "+" && n.txt != parts[i]:
return
}

for _, child := range n.children {
child.match(result, parts, i+1)
}
}

func (n *Node) Find(parts []string) (*Node, bool) {
if len(parts) == 0 {
return n, true
}
c := n.FindChild(parts[0])
if c == nil {
return nil, false
}
return c.Find(parts[1:])
}

func (n *Node) FindChild(txt string) *Node {
for _, child := range n.children {
if child.txt == txt {
return child
}
}
return nil
}

func (n *Node) AddChild(c *Node) {
c.parent = n
n.children = append(n.children, c)
}

func (n *Node) Filter() string {
if n.parent == nil {
return n.txt
}
return n.parent.Filter() + "/" + n.txt
}

func (n *Node) Leafs() []*Node {
var leafs []*Node
for _, c := range n.children {
if c.IsLeaf() {
leafs = append(leafs, c)
continue
}
leafs = append(leafs, c.Leafs()...)
}
return leafs
}

func (n *Node) IsLeaf() bool {
return len(n.children) == 0
}
4 changes: 4 additions & 0 deletions arn/testdata/arn.TestTree_Filters
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#
+/tennis/#
sport/#
sport/tennis/player1/#
1 change: 1 addition & 0 deletions arn/testdata/golden.files
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
arn.TestTree_Filters
94 changes: 94 additions & 0 deletions arn/tree.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Package arn provides an MQTT topic filter matcher.
Topic filters are stored in a tree where each node represents one
level in a topic.
*/
package arn

import (
"strings"
"sync"
)

// NewTree returns a new empty topic filter tree.
func NewTree() *Tree {
return &Tree{
root: NewNode(""),
}
}

type Tree struct {
m sync.RWMutex
root *Node
}

// Match populates result with leaf nodes matching the given topic
// name.
func (t *Tree) Match(result *[]*Node, topic string) {
t.m.RLock()
defer t.m.RUnlock()
parts := strings.Split(topic, "/")
for _, child := range t.root.children {
child.match(result, parts, 0)
}
}

// Filters returns all topic filters in the tree
func (t *Tree) Filters() []string {
var filters []string
for _, l := range t.Leafs() {
filters = append(filters, l.Filter())
}
return filters
}

// Leafs returns all topic filters in the tree as nodes.
func (t *Tree) Leafs() []*Node {
t.m.RLock()
defer t.m.RUnlock()
return t.root.Leafs()
}

// AddFilter adds the topic filter to the tree. Returns existing or
// new node for that filter. Returns nil on empty filter.
func (t *Tree) AddFilter(filter string) *Node {
if filter == "" {
return nil
}
t.m.Lock()
defer t.m.Unlock()
parts := strings.Split(filter, "/")
n := t.addParts(t.root, parts)
// t.root is just a virtual parent
for _, top := range t.root.children {
top.parent = nil
}
return n
}

// Find returns node matching the given filter. If not found, nil and
// false is returned.
func (t *Tree) Find(filter string) (*Node, bool) {
if filter == "" {
return nil, false
}
t.m.Lock()
defer t.m.Unlock()
parts := strings.Split(filter, "/")
return t.root.Find(parts)
}

func (t *Tree) addParts(n *Node, parts []string) *Node {
if len(parts) == 0 {
return n
}
parent := n.FindChild(parts[0])
if parent == nil {
parent = NewNode(parts[0])
n.AddChild(parent)
}
// add rest
return t.addParts(parent, parts[1:])
}
163 changes: 163 additions & 0 deletions arn/tree_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
Filters
#
+/tennis/#
sport/#
sport/tennis/player1/#
should all match the following topics
sport/tennis/player1
sport/tennis/player1/ranking
sport/tennis/player1/score/wimbledon
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901241
*/
package arn

import (
"bytes"
"reflect"
"testing"

"github.com/gregoryv/golden"
)

func TestTree_Find(t *testing.T) {
x := NewTree()
n, found := x.Find("")
if found || n != nil {
t.Error("Find returned", n, found)
}

n, found = x.Find("no/such/filter")
if found || n != nil {
t.Error("Find returned", n, found)
}

filter := "store/candy/door/#"
x.AddFilter(filter)
n, found = x.Find(filter)
if !found || n == nil {
t.Error("Find returned", n, found)
}
}

func TestTree_NoMatch(t *testing.T) {
x := newTestTree()
x.AddFilter("garage/+")

var result []*Node
topic := "store/fruit/apple"
x.Match(&result, topic)
if len(result) != 1 {
t.Log("filters: ", x.Filters())
t.Errorf("%s should only match one filter", topic)
}
}

func TestTree_Filters(t *testing.T) {
x := newTestTree()
var buf bytes.Buffer
for _, f := range x.Filters() {
buf.WriteString(f)
buf.WriteString("\n")
}
golden.Assert(t, buf.String())
}

func newTestTree() *Tree {
x := NewTree()
x.AddFilter("") // should result in a noop
x.AddFilter("#")
x.AddFilter("+/tennis/#")
x.AddFilter("sport/#")
x.AddFilter("sport/tennis/player1/#")
return x
}

func TestRouter(t *testing.T) {
t.Run("Tree", func(t *testing.T) {
testRouterMatch(t, NewTree())
})
}

func testRouterMatch(t *testing.T, r Router) {
t.Helper()
exp := []string{
"#",
"+/tennis/#",
"sport/#",
"sport/tennis/player1/#",
}
for _, filter := range exp {
r.AddFilter(filter)
}

topics := []string{
"sport/tennis/player1",
"sport/tennis/player1/ranking",
"sport/tennis/player1/score/wimbledon",
}
var filters []string
var result []*Node
for _, topic := range topics {
t.Run(topic, func(t *testing.T) {
result = result[:0] // reset
r.Match(&result, topic)
filters = filters[:0] // reset
for _, n := range result {
filters = append(filters, n.Filter())
}
if !reflect.DeepEqual(filters, exp) {
t.Log(r)
t.Error("\ntopic: ", topic, "matched by\n", filters, "\nexpected\n", exp)
}
})
}

t.Run("$sys", func(t *testing.T) {
var result []*Node
topic := "$sys/health"
r.Match(&result, topic)
if len(result) > 0 {
t.Error("$sys should not match", r.(*Tree).Filters())
}
})

}

type Router interface {
AddFilter(string) *Node
Match(result *[]*Node, topic string)
}

func BenchmarkTree_Match(b *testing.B) {
benchmarkRouterMatch(b, NewTree())
}

func benchmarkRouterMatch(b *testing.B, r Router) {
b.Helper()
exp := []string{
"#",
"+/tennis/#",
"sport/#",
"sport/tennis/player1/#",
}
for _, filter := range exp {
r.AddFilter(filter)
}
topics := []string{
"sport/tennis/player1",
"sport/tennis/player1/ranking",
"sport/tennis/player1/score/wimbledon",
}
var result []*Node // could make result pooled
for i := 0; i < b.N; i++ {
for _, topic := range topics {
result = result[:0] // reset
r.Match(&result, topic)
}
}
}
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ changes will be documented in this file.

## [unreleased]

- Optimize message routing using arn.Tree
- Support multiple topic filters in subscription
- Add flag tt pub --repeat
- Fix timeout for tt srv
Expand Down
Loading

0 comments on commit a166501

Please sign in to comment.