-
Notifications
You must be signed in to change notification settings - Fork 198
/
workflow.go
129 lines (111 loc) · 3.95 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package pso
import (
"errors"
"fmt"
"time"
"github.com/pborman/uuid"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
type WorkflowResult struct {
Msg string // Uppercase the members otherwise serialization won't work!
Success bool
}
// ActivityOptions can be reused
var ActivityOptions = workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
HeartbeatTimeout: 2 * time.Second, // such a short timeout to make sample fail over very fast
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
}
const ContinueAsNewStr = "CONTINUEASNEW"
// PSOWorkflow workflow definition
func PSOWorkflow(ctx workflow.Context, functionName string) (string, error) {
logger := workflow.GetLogger(ctx)
logger.Info(fmt.Sprintf("Optimizing function %s", functionName))
// Set activity options
ctx = workflow.WithActivityOptions(ctx, ActivityOptions)
// Setup query handler for query type "child"
var childWorkflowID string
err := workflow.SetQueryHandler(ctx, "child", func() (string, error) {
return childWorkflowID, nil
})
if err != nil {
msg := fmt.Sprintf("SetQueryHandler failed: " + err.Error())
logger.Error(msg)
return msg, err
}
// Retry with different random seed
settings := PSODefaultSettings(functionName)
const NumberOfAttempts = 5
for i := 1; i < NumberOfAttempts; i++ {
logger.Info(fmt.Sprintf("Attempt #%d", i))
swarm, err := NewSwarm(ctx, settings)
if err != nil {
msg := fmt.Sprintf("Optimization failed. " + err.Error())
logger.Error(msg)
return msg, err
}
// Set child workflow options
// Parent workflow can choose to specify it's own ID for child execution. Make sure they are unique for each execution.
wid := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return "PSO_Child_" + uuid.New()
})
err = wid.Get(&childWorkflowID)
if err != nil {
return "", err
}
cwo := workflow.ChildWorkflowOptions{
WorkflowID: childWorkflowID,
WorkflowRunTimeout: time.Minute,
WorkflowTaskTimeout: time.Minute,
}
ctx = workflow.WithChildOptions(ctx, cwo)
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, PSOChildWorkflow, *swarm, 1)
var result WorkflowResult
err = childWorkflowFuture.Get(ctx, &result) // This blocking until the child workflow has finished
if err != nil {
msg := fmt.Sprintf("Parent execution received child execution failure. " + err.Error())
logger.Error(msg)
return msg, err
}
if result.Success {
msg := fmt.Sprintf("Optimization was successful at attempt #%d. %s", i, result.Msg)
logger.Info(msg)
return msg, nil
}
}
msg := fmt.Sprintf("Unable to reach goal after %d attempts", NumberOfAttempts)
logger.Info(msg)
return msg, nil
}
// PSOChildWorkflow workflow definition
// Returns true if the optimization has converged
func PSOChildWorkflow(ctx workflow.Context, swarm Swarm, startingStep int) (WorkflowResult, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Child workflow execution started.")
// Set activity options
ctx = workflow.WithActivityOptions(ctx, ActivityOptions)
// Run real optimization loop
result, err := swarm.Run(ctx, startingStep)
if err != nil {
if err.Error() == ContinueAsNewStr {
return WorkflowResult{"NewContinueAsNewError", false}, workflow.NewContinueAsNewError(ctx, PSOChildWorkflow, swarm, result.Step+1)
}
msg := fmt.Sprintf("Error in swarm loop: " + err.Error())
logger.Error(msg)
return WorkflowResult{msg, false}, errors.New("error in swarm loop")
}
if result.Position.Fitness < swarm.Settings.function.Goal {
msg := fmt.Sprintf("Yay! Goal was reached @ step %d (fitness=%.2e) :-)", result.Step, result.Position.Fitness)
logger.Info(msg)
return WorkflowResult{msg, true}, nil
}
msg := fmt.Sprintf("Goal was not reached after %d steps (fitness=%.2e) :-)", result.Step, result.Position.Fitness)
logger.Info(msg)
return WorkflowResult{msg, false}, nil
}