-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprocessor.go
167 lines (146 loc) · 3.9 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package kawa
import (
"context"
"errors"
"fmt"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
var tracer trace.Tracer
func init() {
tracer = otel.Tracer("kawa/processor")
}
type Processor[T1, T2 any] struct {
src Source[T1]
dst Destination[T2]
handler Handler[T1, T2]
parallelism int
tracing bool
metrics bool
}
type Config[T1, T2 any] struct {
Source Source[T1]
Destination Destination[T2]
Handler Handler[T1, T2]
}
type Option func(*Options)
type Options struct {
Parallelism int
Tracing bool
Metrics bool
}
func Parallelism(n int) func(*Options) {
return func(o *Options) {
o.Parallelism = n
}
}
func Tracing(b bool) func(*Options) {
return func(o *Options) {
o.Tracing = b
}
}
func Metrics(b bool) func(*Options) {
return func(o *Options) {
o.Metrics = b
}
}
// New instantiates a new Processor. `Processor.Run` must be called after calling `New`
// before events will be processed.
func New[T1, T2 any](c Config[T1, T2], opts ...Option) (*Processor[T1, T2], error) {
if c.Source == nil || c.Destination == nil {
return nil, errors.New("both Source and Destination required")
}
if c.Handler == nil {
return nil, errors.New("handler required. Have you considered kawa.Pipe?")
}
var op Options
for _, o := range opts {
o(&op)
}
p := &Processor[T1, T2]{
src: c.Source,
dst: c.Destination,
handler: c.Handler,
parallelism: op.Parallelism,
tracing: op.Tracing,
metrics: op.Metrics,
}
if p.parallelism < 1 {
p.parallelism = 1
}
return p, nil
}
// handle runs the loop to receive, process and send messages.
func (p *Processor[T1, T2]) handle(ctx context.Context) error {
for {
ctx, handleSpan := tracer.Start(ctx, "kawa.processor.full")
rctx, recvSpan := tracer.Start(ctx, "kawa.processor.src.recv")
msg, ack, err := p.src.Recv(rctx)
if err != nil {
return fmt.Errorf("source: %w", err)
}
recvSpan.End()
hctx, hdlSpan := tracer.Start(ctx, "kawa.processor.handler.handle")
msgs, err := p.handler.Handle(hctx, msg)
if err != nil {
return fmt.Errorf("handler: %w", err)
}
hdlSpan.End()
// If there are no messages, we don't need to send nil to destination
if len(msgs) == 0 {
handleSpan.End()
Ack(ack)
continue
}
sctx, sendSpan := tracer.Start(ctx, "kawa.processor.dst.send")
err = p.dst.Send(sctx, ack, msgs...)
if err != nil {
return fmt.Errorf("destination: %w", err)
}
sendSpan.End()
handleSpan.End()
}
}
// Run is a blocking call, and runs until either the ctx is canceled, or an
// unrecoverable error is encountered. If any error is returned from a source,
// destination or the handler func, then it's wrapped and returned. If the
// passed-in context is canceled, this will not return the context.Canceled
// error to indicate a clean shutdown was successful. Run will return
// ctx.Err() in other cases where context termination leads to shutdown of the
// processor.
func (p *Processor[T1, T2]) Run(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(p.parallelism)
ctx, cancel := context.WithCancel(ctx)
errc := make(chan error, p.parallelism)
for i := 0; i < p.parallelism; i++ {
go func(c context.Context) {
if e := p.handle(c); e != nil {
errc <- e
}
wg.Done()
}(ctx)
}
var err error
select {
case <-ctx.Done():
// context was stopped by parent's cancel or parent timeout.
// set err = ctx.Err() *only if* error is *not* context.Canceled,
// because our contract defines that to be the way callers should stop
// a worker cleanly.
if !errors.Is(ctx.Err(), context.Canceled) {
err = ctx.Err()
}
case err = <-errc:
// All errors are fatal to this worker
err = fmt.Errorf("worker: %w", err)
}
// Stop all the workers on shutdown.
cancel()
// TODO: capture errors thrown during shutdown? if we do this, write local
// err first. it represents first seen
wg.Wait()
close(errc)
return err
}