pipeline is a package that simplifies creating staged pipelines in go
.
It provides a simple API to construct stages and steps to execute complex tasks. The packages supports many features including concurrent stages and configurable workers as well as buffered steps.
At minimum to create a Pipeline
, you'll need to create a single Stage
with a single Step
.
Every Step
in the Pipeline
requires input and provides output via channels.
func main() {
// Create a step with a function that does the work
step := pipeline.NewStep("the step", func(ctx *pipeline.Context, in <-chan interface{}, out chan interface{}) error {
// Take the input
input := (<-in).(string)
// Do something with it
input += " pipeline"
// Pass it along
out<-input
return nil
})
// Create the pipeline with one stage that contains the step
p := pipeline.NewPipeline("the pipeline")
p.AddStage(pipeline.NewStage("the stage", step))
// Run the pipeline
in := make(chan interface{})
go func() {
in<-"hello"
}()
out := p.Process(nil, in)
// Get the output
fmt.Println((<-out).(string))
}
A stage is a collection of steps that can be run serially or concurrently.
It can be created using pipeline.NewStage(...)
and by default it will create a new serial stage.
A serial stage will run all the steps in the stage serially, or one after another. The output from one step will flow into the input of the next step.
Serial stages can be created with:
stage := pipeline.NewSerialStage(name, step1, ...)
A concurrent stage will run all the steps in the stage concurrently, or together. The input for each step will come from the output of the last stage and all the output of the steps in this stage will be provided to the next stage.
Concurrent stages can be created with:
stage := pipeline.NewConcurrentStage(name, step1, ...)
A step is a single processing unit of a pipeline. It takes input in the form of an interface{}
from a channel, does some work on it, and then should provide the output to another channel.
Steps can be configured in various ways and can modify the pipeline to provide flexibility. The kinds of steps are:
- worker
- buffered
- fan out
Steps can be created using pipeline.NewStep(...)
and by default it will create a new worker step with a worker count of 1.
A worker step lets you define the number of workers to spawn in go routines to handle this step. This effectively makes the step concurrent by allowing multiple routines process input.
Worker steps can be created with:
step := pipeline.NewWorkerStep(name, workerCount, stepFn)
A buffered step creates a step with a buffered output channel allowing data to be buffered while processing. Sending to the output channel doesn't block if the buffer still has space and allows the step to continue reading from the input channel.
Buffered steps can be created with:
step := pipeline.NewBufferedStep(name, stepFn)
A fan out step creates a step that replicates or fans out the input channel across a number of workers. In this model, the worker count indicates how many concurrent steps to replicate the input data on. Note that this will process redundant data streams.
Fan out steps can be created with:
step := pipeline.NewFanOutStep(name, workerCount, stepFn)
Progress of the pipeline can be tracked in a few ways:
- elapsed time
- state changes
- alternate progress updates
Calling ElapsedTime()
returns a time.Duration
based on the start time of the pipeline.
// Get elapsed time
t := pipeline.ElapsedTime()
fmt.Printf("pipeline has been running for %f seconds\n" + t.Seconds())
Pipelines emit a State
for several changes on a channel. These include a Status
and progress percentages. Progress is based on the total number of steps in the pipeline.
// Listen to state changes
for {
select {
case state := <-pipeline.State():
fmt.Printf("name: %s status: %s progress: %f", state.Name, state.Status.String(), state.Progress)
case <-pipeline.Dead()
return
}
}
Pipelines also provide an alternate approach to measuring progress. In this model, the pipeline.Context
provided in each Step
can be used to configure the total and increment units of work.
pipeline.Total(unitsOfWork)
func step(ctx *pipeline.Context, in <-chan interface{}, out chan interface{}) error {
// Do some work
...
// Update progress markers
ctx.Inc()
}
Contributions are what makes the open-source community such an amazing place to learn, inspire, and create. Any contributions you make are greatly appreciated.
- Fork the Project
- Create your Feature Branch (
git checkout -b feature/AmazingFeature
) - Commit your Changes (
git commit -m 'Add some AmazingFeature'
) - Push to the Branch (
git push origin feature/AmazingFeature
) - Open a Pull Request
Distributed under the MIT License.