-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
133 lines (114 loc) · 3.42 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package gopubsub
import "log"
// PubSub is the main struct around which the API revolves.
type PubSub struct {
subscribers []chan interface{}
// Operations
newsub chan subscribeOp
pub chan interface{}
delsub chan (<-chan interface{})
close chan struct{}
lastPublication interface{}
closed bool
}
type subscribeOp struct {
getInitial bool
channel chan interface{}
}
// New creates a pointer to a PubSub. PubSub uses a []chan interface{} to keep
// the list of subscribers, and the `cap` parameter is the initial headroom, not
// the final number of subscribers possible. It can go as high as you need to.
func New(cap int) *PubSub {
ps := &PubSub{
subscribers: make([]chan interface{}, 0, cap),
newsub: make(chan subscribeOp),
pub: make(chan interface{}),
delsub: make(chan (<-chan interface{})),
close: make(chan struct{}),
lastPublication: nil,
}
go func() {
defer log.Println("Loop closed")
for {
select {
case op := <-ps.newsub:
// New Subscriber
//
ps.subscribers = append(ps.subscribers, op.channel)
if op.getInitial && ps.lastPublication != nil {
// subscriber wants the last published data
op.channel <- ps.lastPublication
}
case c := <-ps.delsub:
// Remove Subscriber
//
l := len(ps.subscribers)
for i := 0; i < l; i++ {
if ps.subscribers[i] == c {
// Uses the technique described here: https://github.com/golang/go/wiki/SliceTricks
// This is used as a pointer set, so it's not like the ordinary case
ps.subscribers, ps.subscribers[len(ps.subscribers)-1] = append(ps.subscribers[:i], ps.subscribers[i+1:]...), nil
break
}
}
case p := <-ps.pub:
// Publish
//
ps.lastPublication = p
for _, suber := range ps.subscribers {
select {
case suber <- p:
// Message consumed by subscriber
default:
// Subscriber is not listening
break
}
}
case <-ps.close:
// Closed
//
for _, suber := range ps.subscribers {
close(suber)
}
break
}
}
}()
return ps
}
// Subscribe gets you subscribed to this PubSub channel and returns a chan on
// which you can listen for updates. The `getinitial` is used to get the very
// last entry on this channel prior to getting the subsequent updates.
func (ps *PubSub) Subscribe(getinitial bool) <-chan interface{} {
// buffer set to one so that the PubSub loop will be able to send to this in
// case of a getinitial == true without blocking
c := make(chan interface{}, 1)
r := subscribeOp{
getInitial: getinitial,
channel: c,
}
ps.newsub <- r
return c
}
// Publish is used to publish the given `data` to all subscribers. Not that the
// `data` must not be nil or there will be panic.
func (ps *PubSub) Publish(data interface{}) {
if data == nil {
panic("Cannot Publish() nil. That is equivalent to a channel closure which is invalid.")
}
ps.pub <- data
}
// Unsubscribe removes the given channel `c` from the list of subscribers. Note
// that `c` must have been returned by the Subscribe() API call.
func (ps *PubSub) Unsubscribe(c <-chan interface{}) {
ps.delsub <- c
}
// Close is used to dispose of the PubSub. After this call, all the channels
// that have been distributed from the Subscribe() call will be closed and will
// thus return the default nil value of the type.
func (ps *PubSub) Close() {
if !ps.closed {
ps.closed = true
ps.close <- struct{}{}
}
}