-
Notifications
You must be signed in to change notification settings - Fork 8
/
pipe.go
48 lines (41 loc) · 916 Bytes
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package gonzo
import (
"github.com/omeid/gonzo/context"
)
type pipe struct {
context context.Context
files <-chan File
}
func (p pipe) Context() context.Context {
return p.context
}
func (p pipe) Files() <-chan File {
return p.files
}
// Pipes the current Channel to the give list of Stages and returns the
// last jobs otput pipe.
func (p pipe) Pipe(stages ...Stage) Pipe {
switch len(stages) {
case 0:
return p
case 1:
return makestage(stages[0], p.Context(), p.Files())
default:
return makestage(stages[0], p.Context(), p.Files()).Pipe(stages[1:]...)
}
}
// Waits for the end of channel and closes all the files.
func (p pipe) Wait() error {
var err error
for f := range p.files {
e := f.Close()
if err == nil && e != nil {
err = e
}
}
return err
}
//This is a combination of p.Pipe(....).Wait()
func (p pipe) Then(stages ...Stage) error {
return p.Pipe(stages...).Wait()
}