-
Notifications
You must be signed in to change notification settings - Fork 1
/
broker.go
107 lines (89 loc) · 2.62 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package noise
type topic struct {
s []*subscriber
sMap map[*subscriber]uint8
size uint8
}
func (s *topic) Len() uint8 { return s.size }
func (s *topic) Subscribers() []*subscriber { return s.s }
// topics `keep` registered events
type topics map[Event]*topic
// Get return topic based on event type
func (t topics) Get(e Event) *topic {
if s, ok := t[e]; ok {
return s
}
return nil
}
// Add append a new subscriber to event
// If topic event doesn't exist then is created.
func (t topics) Add(e Event, s *subscriber) {
// If not topic registered
if _, ok := t[e]; !ok {
t[e] = new(topic)
t[e].size = 0
t[e].s = []*subscriber{}
t[e].sMap = make(map[*subscriber]uint8)
}
t[e].s = append(t[e].s, s)
t[e].sMap[s] = uint8(len(t[e].s) - 1)
t[e].size++
}
// Remove subscriber from topics
// It return true for removed subscriber from event else false.
func (t topics) Remove(e Event, s *subscriber) bool {
// Is topic registered?
to, exists := t[e]
if !exists {
return false
}
// Check if subscriber index exists in topic
i, exists := to.sMap[s]
if !exists {
return false
}
// Clear topic from slice and map
to.s = append(to.s[:i], to.s[i+1:]...) // re-slice to remove old subscriber
delete(to.sMap, s) // remove index from mapping
to.size-- // reduce size of subscribers in topic
return true
}
// broker exchange messages between events and subscriber.
// Each broker receive published signal from event for later emit it to subscriber.
type broker struct {
topics topics // topic subscriptions
}
func newBroker(size int) *broker {
return &broker{make(topics, size)}
}
// Register associate subscriber to broker topics.
// It return new registered subscriber.
func (b *broker) Register(e Event, s *subscriber) {
b.topics.Add(e, s)
}
// Unregister remove associated subscriber from topics.
// It return true for success else false.
func (b *broker) Unregister(e Event, s *subscriber) bool {
return b.topics.Remove(e, s)
}
// Publish Emit/send concurrently messages to topic subscribers
// It return number of subscribers notified.
func (b *broker) Publish(msg Signal) uint8 {
// Check if topic is registered before try to emit messages to subscribers.
topic := b.topics.Get(msg.Type())
if topic == nil {
return 0
}
// How many subscribers exists in topic?
length := topic.Len()
// Subscribers in topic!!
subscribers := topic.Subscribers()
// For each subscriber in topic registered emit a new signal
for _, sub := range subscribers {
go func(s *subscriber, m Signal) {
s.Emit(m)
}(sub, msg)
}
// Number of subscribers notified
return length
}