-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
61 lines (53 loc) · 1.49 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package streams
// Consumer is the interface used by Stream.ForEach
type Consumer[T any] interface {
// Accept is called by the user of the consumer to supply a value
Accept(v T) error
// AndThen creates a new consumer from the current with a subsequent action to be performed
//
// multiple consumers can be chained together as one using this method
AndThen(after Consumer[T]) Consumer[T]
}
// NewConsumer creates a new consumer from the function provided
func NewConsumer[T any](f ConsumerFunc[T]) Consumer[T] {
if f == nil {
return nil
}
return consumer[T]{
f: f,
}
}
type consumer[T any] struct {
f ConsumerFunc[T]
inner Consumer[T]
andThen Consumer[T]
}
// Accept is called by the user of the consumer to supply a value
func (c consumer[T]) Accept(v T) (err error) {
if c.f != nil {
err = c.f(v)
} else {
err = c.inner.Accept(v)
}
if err == nil && c.andThen != nil {
err = c.andThen.Accept(v)
}
return
}
// AndThen creates a new consumer from the current with a subsequent action to be performed
//
// multiple consumers can be chained together as one using this method
func (c consumer[T]) AndThen(after Consumer[T]) Consumer[T] {
return consumer[T]{
inner: c,
andThen: after,
}
}
// ConsumerFunc is the function signature used to create a new Consumer
type ConsumerFunc[T any] func(v T) error
func (f ConsumerFunc[T]) Accept(v T) error {
return f(v)
}
func (f ConsumerFunc[T]) AndThen(after Consumer[T]) Consumer[T] {
return NewConsumer[T](f).AndThen(after)
}