-
Notifications
You must be signed in to change notification settings - Fork 5
/
dispatcher.go
103 lines (77 loc) · 2.07 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main
import (
"time"
"errors"
)
var (
WorkQueue chan ConnInfo
Workerstopchan = make(chan bool, 1)
WorkerSlice = make([]Worker, 0)
ReadQueue chan ConnInfo
Readerstopchan = make(chan bool, 1)
ReaderSlice = make([]Worker, 0)
)
// This takes an incomming connection and queues it to be read
func QueueRead(Ci ConnInfo) {
ReadQueue <- Ci
}
// This takes connections that have already been read and
// queues them to be processed
func QueueWork(Ci ConnInfo) {
WorkQueue <- Ci
}
func StartWorkers(nworkers int, chanlen int) {
// Init the channels
WorkQueue = make(chan ConnInfo, chanlen)
// Now, create all of our workers.
for i := 0; i < nworkers; i++ {
//fmt.Println("Starting worker", i+1)
worker := NewWorker(WorkQueue, Workerstopchan)
worker.Work()
// Store this worker so we can stop it later
WorkerSlice = append(WorkerSlice, worker)
}
}
func StartReaders(nreaders int, chanlen int) {
// Init the channels
ReadQueue = make(chan ConnInfo, chanlen)
// Now, create all of our readers
for i := 0; i < nreaders; i++ {
reader := NewWorker(ReadQueue, Readerstopchan)
reader.Read()
// Store this worker so we can stop it later
ReaderSlice = append(ReaderSlice, reader)
}
}
func StopWorkers(nwriters int) error {
for _, w := range WorkerSlice {
w.Stop()
}
// As workers stop, they will tell us that. We must wait for them
// so that we can close the logging after the pending work is done
for wstopped := 0; wstopped < nwriters; {
select {
case <-Workerstopchan:
wstopped++
case <-time.After(time.Second * 5):
return errors.New("Timed out waiting for all workers to stop!")
}
}
return nil
}
func StopReaders(nreaders int) error {
for _, w := range ReaderSlice {
w.Stop()
}
// As readers stop, they will tell us that. We must wait for them
// so that we can close the logging after the pending work is done
for rstopped := 0; rstopped < nreaders; {
select {
case <-Readerstopchan:
rstopped++
case <-time.After(time.Second * 5):
return errors.New("Timed out waiting for all readers to stop!")
}
}
return nil
}