Skip to content

Commit

Permalink
update: 执行速度
Browse files Browse the repository at this point in the history
  • Loading branch information
xmapst committed Jan 6, 2025
1 parent d2b2261 commit 06e44dc
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ require (
github.com/yargevad/filepathx v1.0.0
github.com/yuin/gopher-lua v1.1.1
go.starlark.net v0.0.0-20241125201518-c05ff208a98f
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20241215155358-4a5509556b9e
Expand Down Expand Up @@ -161,7 +162,6 @@ require (
github.com/xanzy/ssh-agent v0.3.3 // indirect
go.mongodb.org/mongo-driver v1.17.1 // indirect
go.uber.org/mock v0.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.12.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.33.0 // indirect
Expand Down
19 changes: 14 additions & 5 deletions internal/service/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package service
import (
"fmt"
"regexp"
"sync"
"time"

"github.com/pkg/errors"
"github.com/segmentio/ksuid"
"go.uber.org/multierr"

"github.com/xmapst/AutoExecFlow/internal/config"
"github.com/xmapst/AutoExecFlow/internal/queues"
Expand Down Expand Up @@ -114,13 +116,20 @@ func (ts *STaskService) Create(task *types.STaskReq) (err error) {
return err
}

// 使用并行的方式入库
var wg sync.WaitGroup
for _, step := range task.Step {
// save step
stepSvc := Step(task.Name, step.Name)
if err = stepSvc.Create(timeout, step); err != nil {
return err
}
wg.Add(1)
go func(step *types.SStepReq) {
defer wg.Done()
// save step
stepSvc := Step(task.Name, step.Name)
if _err := stepSvc.Create(timeout, step); _err != nil {
err = multierr.Append(err, fmt.Errorf("save step error: %s", _err))
}
}(step)
}
wg.Wait()
// 提交任务
return queues.PublishTask(task.Node, ts.name)
}
Expand Down
68 changes: 46 additions & 22 deletions internal/worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"runtime/debug"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -26,6 +27,10 @@ type sTask struct {
}

func newTask(taskName string) (*sTask, error) {
sTime := time.Now()
defer func() {
logx.Debugln(taskName, "耗时", time.Since(sTime))
}()
t := &sTask{
storage: storage.Task(taskName),
workspace: filepath.Join(config.App.WorkSpace(), taskName),
Expand Down Expand Up @@ -68,21 +73,43 @@ func newTask(taskName string) (*sTask, error) {
// 1. 创建顶点
var stepVertex = make(map[string]*dag.Vertex)
var steps = t.storage.StepNameList("")

// 缓存步骤信息
stepInfo := make(map[string]storage.IStep)
for _, sName := range steps {
// 跳过禁用的步骤
if t.storage.Step(sName).IsDisable() {
logx.Infoln("the step is disabled, no execution required", sName)
_ = t.storage.Step(sName).Update(&models.SStepUpdate{
Message: "the step is disabled, no execution required",
State: models.Pointer(models.StateStopped),
OldState: models.Pointer(models.StatePending),
STime: models.Pointer(time.Now()),
ETime: models.Pointer(time.Now()),
})
continue
}
stepVertex[sName] = dag.NewVertex(sName, newStep(t.storage.Step(sName), t.workspace, t.scriptDir))
stepInfo[sName] = t.storage.Step(sName)
}

// 使用并行的方式创建顶点
var mu sync.Mutex
var wg sync.WaitGroup
for _, sName := range steps {
wg.Add(1)
go func(sName string) {
defer wg.Done()

step := stepInfo[sName]
if step.IsDisable() {
logx.Infoln("the step is disabled, no execution required", sName)
_ = step.Update(&models.SStepUpdate{
Message: "the step is disabled, no execution required",
State: models.Pointer(models.StateStopped),
OldState: models.Pointer(models.StatePending),
STime: models.Pointer(time.Now()),
ETime: models.Pointer(time.Now()),
})
return
}
vertex := dag.NewVertex(sName, newStep(step, t.workspace, t.scriptDir))

mu.Lock()
stepVertex[sName] = vertex
mu.Unlock()

}(sName)
}
wg.Wait()

if len(stepVertex) == 0 {
err = errors.New("no enabled steps")
return nil, err
Expand All @@ -102,17 +129,14 @@ func newTask(taskName string) (*sTask, error) {
logx.Errorln(t.name(), err)
return nil, err
}
err = vertex.WithDeps(func() []*dag.Vertex {
var stepFns []*dag.Vertex
for _, dep := range t.storage.Step(sName).Depend().List() {
_stepFn, _ok := stepVertex[dep]
if !_ok {
continue
}
deps := stepInfo[sName].Depend().List()
var stepFns []*dag.Vertex
for _, dep := range deps {
if _stepFn, _ok := stepVertex[dep]; _ok {
stepFns = append(stepFns, _stepFn)
}
return stepFns
}()...)
}
err = vertex.WithDeps(stepFns...)
if err != nil {
logx.Errorln(t.name(), err)
return nil, err
Expand Down
49 changes: 29 additions & 20 deletions pkg/dag/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,53 +355,62 @@ func (g *Graph) compile() (err error) {
}
g.ctx.Unlock()
}()

var nameMap = make(map[string]bool)

// 循环遍历任务列表 g.vertex,为每个节点设置相应的属性,并根据任务的依赖关系将节点连接起来。
for k := range g.vertex {
if _, ok := nameMap[g.vertex[k].Name()]; ok {
for _, v := range g.vertex {
if _, ok := nameMap[v.Name()]; ok {
return ErrDuplicateVertexName
}
nameMap[g.vertex[k].Name()] = true
nameMap[v.Name()] = true
// 设置依赖数量
g.vertex[k].ndeps = int64(len(g.vertex[k].deps))
v.ndeps = int64(len(v.deps))

// 将当前顶点作为邻接或相邻分配给父顶点。
// 具体地, 将依赖的节点的指针添加到当前节点的 adjs 切片中,表示当前节点依赖于这些节点。
for _, dep := range g.vertex[k].deps {
g.vertex[dep.cid-1].adjs = append(g.vertex[dep.cid-1].adjs, g.vertex[k])
for _, dep := range v.deps {
g.vertex[dep.cid-1].adjs = append(g.vertex[dep.cid-1].adjs, v)
}

// 如果任务没有依赖,将其节点添加到 roots 切片中。
if len(g.vertex[k].deps) == 0 {
g.vertex[k].root = true
if len(v.deps) == 0 {
v.root = true
}
}

// 检查图形是否存在回环, 检查以本节点为起点的子图是否存在回环。
if err = g.detectCircularDependencies(g.vertex[k], []*Vertex{}); err != nil {
// 使用 DFS 进行环检测
visited := make(map[*Vertex]bool)
stack := make(map[*Vertex]bool)
for _, vertex := range g.vertex {
if err = g.detectCircularDependencies(vertex, visited, stack); err != nil {
return err
}
}

return
}

// 使用深度优先搜索 (DFS) 的方式进行回环检测。它从给定的节点开始遍历邻接节点,并在遍历过程中检查是否存在回环。
// 如果发现已访问过的节点,则存在回环,返回 ErrCycleDetected 错误。否则,继续递归遍历邻接节点。
// 为了避免重复访问节点,使用 visited 属性对已访问的节点进行标记。
func (g *Graph) detectCircularDependencies(current *Vertex, path []*Vertex) error {
// 如果发现某个邻接节点已经被访问过(即存在回环)
if current.ctx.visited {
func (g *Graph) detectCircularDependencies(current *Vertex, visited, stack map[*Vertex]bool) error {
if stack[current] {
return ErrCycleDetected
}
current.ctx.visited = true
defer func() {
current.ctx.visited = false
}()
// 递归地遍历节点的邻接节点
for k := range current.adjs {
if err := g.detectCircularDependencies(current.adjs[k], append(path, current)); err != nil {
if visited[current] {
return nil
}

visited[current] = true
stack[current] = true

for _, adj := range current.adjs {
if err := g.detectCircularDependencies(adj, visited, stack); err != nil {
return err
}
}

stack[current] = false
return nil
}

0 comments on commit 06e44dc

Please sign in to comment.