-
Notifications
You must be signed in to change notification settings - Fork 0
/
imq_hub.go
91 lines (74 loc) · 1.9 KB
/
imq_hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package uwe
import (
"context"
)
type IMQBroker interface {
DefaultBus() SenderBus
AddWorker(name WorkerName) Mailbox
Init() error
Serve(ctx context.Context)
}
type Broker struct {
defaultChanLen int
workersHub map[WorkerName]chan<- *Message
workersMessages chan *Message
}
func NewBroker(defaultChanLen int) *Broker {
if defaultChanLen < 1 {
defaultChanLen = 1
}
return &Broker{
workersHub: map[WorkerName]chan<- *Message{},
workersMessages: make(chan *Message, defaultChanLen)}
}
func (hub *Broker) DefaultBus() SenderBus {
return NewSenderBus(hub.workersMessages)
}
func (hub *Broker) AddWorker(name WorkerName) Mailbox {
workerDirectChan := make(chan *Message, hub.defaultChanLen)
hub.workersHub[name] = workerDirectChan
return NewBus(name, workerDirectChan, hub.workersMessages)
}
func (hub *Broker) Init() error { return nil }
func (hub *Broker) Serve(ctx context.Context) {
for {
select {
case msg := <-hub.workersMessages:
if msg == nil {
continue
}
switch msg.Target {
case TargetSelfInit:
_, ok := hub.workersHub[msg.Sender]
if ok {
continue
}
bus, ok := msg.Data.(chan *Message)
if ok {
hub.workersHub[msg.Sender] = bus
}
case TargetBroadcast:
for to := range hub.workersHub {
if to == msg.Sender {
continue
}
b := hub.workersHub[to]
go sendMsg(b, *msg)
}
default:
if b, ok := hub.workersHub[msg.Target]; ok {
go sendMsg(b, *msg)
}
}
case <-ctx.Done():
return
}
}
}
func sendMsg(bus chan<- *Message, msg Message) { bus <- &msg }
// NopBroker is an empty IMQBroker
type NopBroker struct{}
func (*NopBroker) DefaultBus() SenderBus { return &NopMailbox{} }
func (*NopBroker) AddWorker(name WorkerName) Mailbox { return &NopMailbox{} }
func (*NopBroker) Init() error { return nil }
func (*NopBroker) Serve(ctx context.Context) {}