Skip to content

Simply way to control goroutines execution order based on dependencies

License

Notifications You must be signed in to change notification settings

sinomiko/go-flow

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

48 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

GoFlow

GoDoc License Release GoReport Travis Coverage

Safe DAG framework,GoFlow evolution version

Goflow is a simply package to control goroutines execution order based on dependencies. It works similar to async.auto from node.js async package, but for Go.

realease LOG

20211104

  • add task duration
  • panic recover
  • transfer error code, error msg
  • FailThrough On: task panic, after task will continue
  • solve race condition

Install

Install the package with:

go get github.com/sinomiko/go-flow

Import it with:

import "github.com/sinomiko/go-flow"

and use goflow as the package name inside the code.

Example

package main

import (
	"context"
	"fmt"
	"time"

	goflow "github.com/sinomiko/go-flow"
)

// FlowData flow ctx携带信息
type FlowData struct {
	OuterReq         interface{}
	OuterRsp         interface{}
	AttachInfo       interface{}
	ReqType          uint64
	ProcessorID      string
}

// ReqInfo 请求信息
type ReqInfo struct {
	srcID string
}

func main() {
	doFlow1()
	doFlow2()
	doFlow3()
}

var f1 = func(ctx context.Context, r map[string]interface{}) (interface{}, int64, error) {
	fmt.Println("function1 started", r)
	flow_data, ok := ctx.Value("flowData").(*FlowData)
	if !ok || flow_data == nil {
		return nil, 0, fmt.Errorf("Error: flow_data is nil - %v", flow_data)
	}

	outer_req, ok := flow_data.OuterReq.(*ReqInfo)
	if !ok || outer_req == nil {
		return nil, 0, fmt.Errorf("Error: outer_req is nil - %v", outer_req)
	}
	fmt.Println("function1 end", "ctx:", ctx, "srcID:", outer_req.srcID)
	time.Sleep(time.Millisecond * 25)
	return "f1", 0, nil
}

var f2 = func(ctx context.Context, r map[string]interface{}) (interface{}, int64, error) {
	fmt.Println("function2 started", r["Start"])
	time.Sleep(time.Millisecond * 50)
	return "some results", 0, nil // errors.New("Some error")
}
var f3 = func(ctx context.Context, r map[string]interface{}) (interface{}, int64, error) {
	fmt.Println("function3 started", r["Start"])
	time.Sleep(time.Millisecond * 75)
	return nil, 0, nil
}

var f4 = func(ctx context.Context, r map[string]interface{}) (interface{}, int64, error) {
	fmt.Println("function4 started", r)
	time.Sleep(time.Millisecond * 100)
	return "f4 end", 0, nil
}

func doFlow1() {
	ctx := context.Background()
	flowData := &FlowData{
		OuterReq:    &ReqInfo{srcID: "1"},
		OuterRsp:    "",
		ReqType:     1,
		ProcessorID: "Flow1",
	}
	// 特殊数据 在ctx中从头透传到结尾
	valueCtx := context.WithValue(ctx, "flowData", flowData)

	// with valueCtx
	flow := goflow.New().
		Add("f1", false, []string{}, f1).
		Add("f2", false, []string{"f1"}, f2).
		Add("f3", false, []string{"f1"}, f3).
		Add("f4", false, []string{"f2", "f3"}, f4)
	res, err := flow.Do(valueCtx)

	fmt.Println("======flow1 f4: res======== ")
	fmt.Printf("res : %s\n", res["f4"])
	fmt.Println("======flow1 time stats======== ")
	for k, v := range flow.Funcs {
		fmt.Printf("%s -> duration[%d]\n", k, v.Duration)
	}
	fmt.Println("======flow1 result ======== ")
	fmt.Println(res, err)
}

func doFlow2(){
	fmt.Println("======new flow2======== ")
	ctx2 := context.Background()
	// with Ctx
	flow2 := goflow.New().
		Add("f1", false, []string{}, f1).
		Add("f2", false, []string{"f1"}, f2).
		Add("f3", false, []string{"f1"}, f3).
		Add("f4", false, []string{"f2", "f3"}, f4)
	res2, err2 := flow2.Do(ctx2)

	fmt.Println("======flow2 result ======== ")
	fmt.Println(res2, err2)
}

func doFlow3(){
	fmt.Println("======new flow3======== ")
	ctx3 := context.Background()
	fErr := func(ctx context.Context, r map[string]interface{}) (interface{}, int64, error) {
		fmt.Println("fErr started", r)
		time.Sleep(time.Millisecond * 100)
		panic("test")
		return "f4 end", 0, nil
	}
	// with panic func and skip panic
	flow3 := goflow.New().
		Add("fErr", true, []string{}, fErr).
		Add("f3", false, []string{"fErr"}, f3).
		Add("f4", false, []string{"fErr", "f3"}, f4)
	res3, err3 := flow3.Do(ctx3)

	fmt.Println("======flow3 result ======== ")
	fmt.Println(res3, err3)
}

Output will be:

function1 started map[]
function1 end ctx: context.Background.WithValue(type string, val <not Stringer>) srcID: 1
function2 started <nil>
function3 started <nil>
function4 started map[f2:some results f3:<nil>]
======flow1 f4: res======== 
res : f4 end
======flow1 time stats======== 
f3 -> duration[78]
f4 -> duration[105]
f1 -> duration[26]
f2 -> duration[53]
======flow1 result ======== 
...

About

Simply way to control goroutines execution order based on dependencies

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 100.0%