-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathbatch.go
131 lines (105 loc) · 3.31 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package batch
import (
log "github.com/Deeptiman/go-batch/logger"
"time"
)
// Batch struct defines the structure payload for a Batch.
//
// Item: channel that contains the Resources object from the client.
// Id: Each item that a client send for the processing marked with Id.
// Semaphore: The ReadWrite locks handle by the Semaphore object, it helps to synchronize the batch processing session.
// Islocked: Whenever the batch processing session starts, Islocked changes to [true], so it will restrict the concurrent batch processing.
// Producer: The BatchItem object send to the Producer for further processing.
// Consumer: The Consumer arranges the prepared []BatchItems for the Workerline.
// Log: Batch processing library uses "github.com/sirupsen/logrus" as logging tool.
type Batch struct {
Item chan interface{}
Id int
Semaphore *Semaphore
Islocked bool
Producer *BatchProducer
Consumer *BatchConsumer
Log *log.Logger
}
// NewBatch creates a new Batch object with BatchProducer & BatchConsumer. The BatchOptions
// sets the MaxItems for a batch and maximum wait time for a batch to complete set by MaxWait.
func NewBatch(opts ...BatchOptions) *Batch {
b := &Batch{
Item: make(chan interface{}),
Log: log.NewLogger(),
}
c := NewBatchConsumer()
p := NewBatchProducer(c.ConsumerFunc)
for _, opt := range opts {
opt(p)
}
b.Producer = p
b.Consumer = c
b.Semaphore = NewSemaphore(int(p.MaxItems))
items = make([]BatchItems, 0, p.MaxItems)
return b
}
// StartBatchProcessing function to begin the BatchProcessing library and to start the Producer/
// Consumer listeners. The ReadItems goroutine will receive the item from a source that keeps
// listening infinitely.
func (b *Batch) StartBatchProcessing() {
b.Semaphore.Lock()
defer b.Semaphore.Unlock()
if b.Islocked {
panic("Concurrent batch processing is not allowed!")
}
go b.Producer.WatchProducer()
go b.Consumer.StartConsumer()
go b.ReadItems()
}
// Unlock function will allow the batch processing to start with the multiple iteration
func (b *Batch) Unlock() {
b.Islocked = false
}
// ReadItems function will run infinitely to listen to the Resource channel and the received
// object marshaled with BatchItem and then send to the Producer Watcher channel for further
// processing.
func (b *Batch) ReadItems() {
b.Islocked = true
for {
select {
case item := <-b.Item:
b.Id++
go func(item interface{}) {
b.Producer.Watcher <- &BatchItems{
Id: b.Id,
Item: item,
}
}(item)
time.Sleep(time.Duration(100) * time.Millisecond)
}
}
}
// SetLogLevel [Info:Debug]
func (b *Batch) SetDebugLogLevel() {
b.Log.SetLogLevel(log.Debug)
}
// StopProducer to exit the Producer line.
func (b *Batch) StopProducer() {
b.Producer.Quit <- true
}
// Stop to run StopProducer/StopConsumer goroutines to quit the execution.
func (b *Batch) Stop() {
go b.StopProducer()
}
// Close is the exit function to terminate the batch processing.
func (b *Batch) Close() {
//b.Log.WithFields(log.Fields{"Remaining Items": len(items)}).Warn("CheckRemainingItems")
b.Log.Infoln("CheckRemainingItems", "Remaining=", len(items))
done := make(chan bool)
go b.Producer.CheckRemainingItems(done)
select {
case <-done:
b.Log.Warn("Done")
b.Semaphore.Lock()
b.Stop()
close(b.Item)
b.Islocked = false
b.Semaphore.Unlock()
}
}