-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
71 lines (59 loc) · 1.66 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package prevalentify
import (
"context"
"image/color"
"sync"
)
// ProcessResult contains the result of processing an image. It contains a reference to the ImageSource, as well as
// the top N most prevalent colors.
type ProcessResult struct {
ImageSource ImageSource
Colors []color.Color
}
// Processor reads off of the resolver image channel, processes the image, and sends it to the returned Result channel.
// Errors are sent to the returned error channel.
type Processor func(context.Context, <-chan Image) (<-chan ProcessResult, <-chan error)
// ProcessFn defines the function for actually performing the processing.
type ProcessFn func(context.Context, Image) (ProcessResult, error)
// NewProcessorPool returns a pool of processing workers in the form of a Processor.
func NewProcessorPool(workers int, processFn ProcessFn) Processor {
return func(ctx context.Context, image <-chan Image) (<-chan ProcessResult, <-chan error) {
// initialize channels
resultCh := make(chan ProcessResult)
errCh := make(chan error)
var wg sync.WaitGroup
wg.Add(workers)
// spawn workers
for i := 0; i < workers; i++ {
go func() {
process(ctx, image, resultCh, errCh, processFn)
wg.Done()
}()
}
// handle shutdown
go func() {
wg.Wait()
close(resultCh)
close(errCh)
}()
return resultCh, errCh
}
}
func process(ctx context.Context, image <-chan Image, out chan ProcessResult, errCh chan error, processFn ProcessFn) {
for {
select {
case img, ok := <-image:
if !ok {
return
}
result, err := processFn(ctx, img)
if err != nil {
errCh <- err
continue
}
out <- result
case <-ctx.Done():
return
}
}
}