-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
router_pipeline.go
80 lines (70 loc) · 1.76 KB
/
router_pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package goul
import (
"errors"
"fmt"
"os"
)
//*** pipeline router -----------------------------------------------
// Pipeline is a structure of pipeline router
type Pipeline struct {
Router
err error
pipes []Pipe
}
// Run implements Router
func (r *Pipeline) Run() (ctrl, done chan Item, err error) {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "Pipeline#Run recovered from panic!\n")
fmt.Fprintf(os.Stderr, "Probably an inheritance problem of pipeline instance.\n")
fmt.Fprintf(os.Stderr, "panic: %v\n", r)
err = errors.New("panic")
}
}()
if r.getReader() == nil || r.getWriter() == nil {
return nil, nil, errors.New(ErrRouterNoReaderOrWriter)
}
ctrl = make(chan Item)
var ch chan Item
ch, r.err = r.getReader().Read(ctrl, nil)
if r.err != nil {
return nil, nil, r.err
}
for _, p := range r.GetPipes() {
if p.IsConverter() {
ch, r.err = p.Convert(ch, nil)
} else {
ch, r.err = p.Revert(ch, nil)
}
if r.err != nil {
return nil, nil, r.err
}
}
done, r.err = r.getWriter().Write(ch, nil)
if r.err != nil {
return nil, nil, r.err
}
Log(r.getLogger(), "router", "started ---------------------------------")
return ctrl, done, nil
}
// AddPipe implements Router
func (r *Pipeline) AddPipe(pipe Pipe) (err error) {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "Pipeline#AddPipe recovered from panic!\n")
fmt.Fprintf(os.Stderr, "Probably an inheritance problem of pipeline instance.\n")
fmt.Fprintf(os.Stderr, "panic: %v\n", r)
err = errors.New("panic")
}
}()
pipe.SetLogger(r.getLogger())
r.pipes = append(r.pipes, pipe)
return nil
}
// GetPipes implements Router
func (r *Pipeline) GetPipes() []Pipe {
if r.pipes == nil {
return []Pipe{}
}
return r.pipes
}