-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
92 lines (78 loc) · 1.63 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package gpool
import (
"sync"
)
type Pool struct {
cap int
jobs chan Job
results chan Result
done chan struct{}
completed bool
}
func NewPool(cap int) *Pool {
p := &Pool{cap: cap}
p.jobs = make(chan Job, cap)
p.results = make(chan Result, cap)
return p
}
func (p *Pool) Start(tasks []any, procFunc TaskProcessorFn, resFunc ResultProcessorFn) {
p.done = make(chan struct{})
go p.allocate(tasks)
go p.collect(resFunc)
go p.process(procFunc)
<-p.done
}
func (p *Pool) allocate(tasks []any) {
defer close(p.jobs)
for i, v := range tasks {
p.jobs <- Job{id: i, task: v}
}
}
//func (p *Pool) process(processor TaskProcessorFn) {
// defer close(p.results)
// eg := new(errgroup.Group)
// eg.SetLimit(p.cap)
// for job := range p.jobs {
// eg.Go(wraper.RecoveredFn(func() error {
// err := processor(job.task)
// p.results <- Result{job, err}
// return err
// }))
// }
// _ = eg.Wait()
//}
func (p *Pool) process(processor TaskProcessorFn) {
defer close(p.results)
wg := sync.WaitGroup{}
for i := 0; i < p.cap; i++ {
wg.Add(1)
go p.work(&wg, processor)
}
wg.Wait()
}
func (p *Pool) work(wg *sync.WaitGroup, processor TaskProcessorFn) {
defer wg.Done()
for job := range p.jobs {
p.results <- Result{job, processor(job.task)}
}
}
func (p *Pool) collect(proc ResultProcessorFn) {
for result := range p.results {
_ = proc(result)
}
p.done <- struct{}{}
p.completed = true
}
func (p *Pool) IsCompleted() bool {
return p.completed
}
type TaskProcessorFn func(task any) error
type ResultProcessorFn func(result Result) error
type Job struct {
id int
task any
}
type Result struct {
Job Job
Err error
}