Skip to content

Commit

Permalink
Refactor StateOptions and execute failure policy (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Nov 28, 2023
1 parent c17ff97 commit 424afc8
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 85 deletions.
20 changes: 10 additions & 10 deletions integ/execute_api_fail_recovery_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ import (
"github.com/indeedeng/iwf-golang-sdk/iwf"
)

type executeApiFailRecoveryWorkflowState1 struct{}
type executeApiFailRecoveryWorkflowState1 struct {
iwf.WorkflowStateDefaultsNoWaitUntil
}

func (b executeApiFailRecoveryWorkflowState1) GetStateId() string {
return "execute_api_fail_recovery_workflow_state1"
}

func (b executeApiFailRecoveryWorkflowState1) GetStateOptions() *iwfidl.WorkflowStateOptions {
options := iwf.NewWorkflowStateOptionsExtension(nil).SetProceedOnExecuteFailure(executeApiFailRecoveryWorkflowState2{}, nil)
options.ExecuteApiRetryPolicy = &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(1),
MaximumAttempts: iwfidl.PtrInt32(1),
func (b executeApiFailRecoveryWorkflowState1) GetStateOptions() *iwf.StateOptions {
options := &iwf.StateOptions{
ExecuteApiRetryPolicy: &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(1),
MaximumAttempts: iwfidl.PtrInt32(1),
},
ExecuteApiFailureProceedState: &executeApiFailRecoveryWorkflowState2{},
}

return options
}

func (b executeApiFailRecoveryWorkflowState1) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
return iwf.EmptyCommandRequest(), nil
}

func (b executeApiFailRecoveryWorkflowState1) Execute(ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
return nil, errors.New("error")
}
10 changes: 1 addition & 9 deletions integ/execute_api_fail_recovery_workflow_state2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@ import (
)

type executeApiFailRecoveryWorkflowState2 struct {
iwf.DefaultStateOptions
}

func (b executeApiFailRecoveryWorkflowState2) GetStateId() string {
return "execute_api_fail_recovery_workflow_state2"
}

func (b executeApiFailRecoveryWorkflowState2) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
return iwf.EmptyCommandRequest(), nil
iwf.WorkflowStateDefaultsNoWaitUntil
}

func (b executeApiFailRecoveryWorkflowState2) Execute(ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
Expand Down
4 changes: 2 additions & 2 deletions integ/proceed_on_state_start_fail_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (b *proceedOnStateStartFailWorkflowState1) Execute(ctx iwf.WorkflowContext,
return iwf.SingleNextState(&proceedOnStateStartFailWorkflowState2{}, b.output), nil
}

func (b *proceedOnStateStartFailWorkflowState1) GetStateOptions() *iwfidl.WorkflowStateOptions {
return &iwfidl.WorkflowStateOptions{
func (b *proceedOnStateStartFailWorkflowState1) GetStateOptions() *iwf.StateOptions {
return &iwf.StateOptions{
WaitUntilApiRetryPolicy: &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(1),
MaximumAttempts: iwfidl.PtrInt32(2),
Expand Down
4 changes: 2 additions & 2 deletions integ/state_api_fail_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func (b stateApiFailWorkflowState1) Execute(ctx iwf.WorkflowContext, input iwf.O
return iwf.ForceFailWorkflow("a failing message"), nil
}

func (b stateApiFailWorkflowState1) GetStateOptions() *iwfidl.WorkflowStateOptions {
return &iwfidl.WorkflowStateOptions{
func (b stateApiFailWorkflowState1) GetStateOptions() *iwf.StateOptions {
return &iwf.StateOptions{
WaitUntilApiRetryPolicy: &iwfidl.RetryPolicy{
MaximumAttempts: iwfidl.PtrInt32(1),
},
Expand Down
4 changes: 2 additions & 2 deletions integ/state_api_timeout_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func (b stateApiTimeoutWorkflowState1) Execute(ctx iwf.WorkflowContext, input iw
return iwf.ForceFailWorkflow("a failing message"), nil
}

func (b stateApiTimeoutWorkflowState1) GetStateOptions() *iwfidl.WorkflowStateOptions {
return &iwfidl.WorkflowStateOptions{
func (b stateApiTimeoutWorkflowState1) GetStateOptions() *iwf.StateOptions {
return &iwf.StateOptions{
WaitUntilApiRetryPolicy: &iwfidl.RetryPolicy{
MaximumAttempts: iwfidl.PtrInt32(1),
},
Expand Down
11 changes: 1 addition & 10 deletions iwf/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,7 @@ func (c *clientImpl) StartWorkflow(ctx context.Context, workflow ObjectWorkflow,
if state != nil {
startStateId = GetFinalWorkflowStateId(state)
startStateOpt := state.GetStateOptions()
if ShouldSkipWaitUntilAPI(state) {
if startStateOpt == nil {
startStateOpt = &iwfidl.WorkflowStateOptions{
SkipWaitUntil: ptr.Any(true),
}
} else {
startStateOpt.SkipWaitUntil = ptr.Any(true)
}
}
unregOpt.StartStateOptions = startStateOpt
unregOpt.StartStateOptions = toIdlStateOptions(ShouldSkipWaitUntilAPI(state), startStateOpt)
}

if options != nil {
Expand Down
45 changes: 35 additions & 10 deletions iwf/internal_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,7 @@ func toIdlDecision(from *StateDecision, wfType string, registry Registry, encode
var options *iwfidl.WorkflowStateOptions
if !strings.HasPrefix(fromMv.NextStateId, ReservedStateIdPrefix) {
stateDef := registry.getWorkflowStateDef(wfType, fromMv.NextStateId)
options = stateDef.State.GetStateOptions()
if ShouldSkipWaitUntilAPI(stateDef.State) {
if options == nil {
options = &iwfidl.WorkflowStateOptions{
SkipWaitUntil: ptr.Any(true),
}
} else {
options.SkipWaitUntil = ptr.Any(true)
}
}
options = toIdlStateOptions(ShouldSkipWaitUntilAPI(stateDef.State), stateDef.State.GetStateOptions())
}
mv := iwfidl.StateMovement{
StateId: fromMv.NextStateId,
Expand All @@ -125,3 +116,37 @@ func toIdlDecision(from *StateDecision, wfType string, registry Registry, encode
NextStates: mvs,
}, nil
}

func toIdlStateOptions(skipWaitUntil bool, stateOptions *StateOptions) *iwfidl.WorkflowStateOptions {
if stateOptions == nil {
stateOptions = &StateOptions{}
}

idlStOptions := &iwfidl.WorkflowStateOptions{
SearchAttributesLoadingPolicy: stateOptions.SearchAttributesLoadingPolicy,
DataAttributesLoadingPolicy: stateOptions.DataAttributesLoadingPolicy,
WaitUntilApiTimeoutSeconds: stateOptions.WaitUntilApiTimeoutSeconds,
ExecuteApiTimeoutSeconds: stateOptions.ExecuteApiTimeoutSeconds,
WaitUntilApiRetryPolicy: stateOptions.WaitUntilApiRetryPolicy,
ExecuteApiRetryPolicy: stateOptions.ExecuteApiRetryPolicy,
WaitUntilApiFailurePolicy: stateOptions.WaitUntilApiFailurePolicy,
}

if skipWaitUntil {
idlStOptions.SkipWaitUntil = ptr.Any(true)
}

if stateOptions.ExecuteApiFailureProceedState != nil {
idlStOptions.ExecuteApiFailurePolicy = iwfidl.PROCEED_TO_CONFIGURED_STATE.Ptr()
idlStOptions.ExecuteApiFailureProceedStateId = ptr.Any(GetFinalWorkflowStateId(stateOptions.ExecuteApiFailureProceedState))

proceedStateOptions := stateOptions.ExecuteApiFailureProceedState.GetStateOptions()
if proceedStateOptions != nil && proceedStateOptions.ExecuteApiFailureProceedState != nil {
panic("nested failure handling/recovery is not supported: ExecuteApiFailureProceedState cannot have ExecuteApiFailureProceedState")
}
idlStOptions.ExecuteApiFailureProceedStateOptions =
toIdlStateOptions(ShouldSkipWaitUntilAPI(stateOptions.ExecuteApiFailureProceedState), proceedStateOptions)
}

return idlStOptions
}
16 changes: 16 additions & 0 deletions iwf/state_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package iwf

import (
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
)

type StateOptions struct {
SearchAttributesLoadingPolicy *iwfidl.PersistenceLoadingPolicy
DataAttributesLoadingPolicy *iwfidl.PersistenceLoadingPolicy
WaitUntilApiTimeoutSeconds *int32
ExecuteApiTimeoutSeconds *int32
WaitUntilApiRetryPolicy *iwfidl.RetryPolicy
ExecuteApiRetryPolicy *iwfidl.RetryPolicy
WaitUntilApiFailurePolicy *iwfidl.WaitUntilApiFailurePolicy
ExecuteApiFailureProceedState WorkflowState
}
5 changes: 2 additions & 3 deletions iwf/workflow_state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package iwf

import (
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
"reflect"
)

Expand Down Expand Up @@ -51,7 +50,7 @@ type WorkflowState interface {

// GetStateOptions can just return nil to use the default Options
// StateOptions is optional configuration to adjust the state behaviors
GetStateOptions() *iwfidl.WorkflowStateOptions
GetStateOptions() *StateOptions
}

// GetFinalWorkflowStateId returns the stateId that will be registered and used
Expand Down Expand Up @@ -95,7 +94,7 @@ func (d DefaultStateId) GetStateId() string {

type DefaultStateOptions struct{}

func (d DefaultStateOptions) GetStateOptions() *iwfidl.WorkflowStateOptions {
func (d DefaultStateOptions) GetStateOptions() *StateOptions {
return nil
}

Expand Down
37 changes: 0 additions & 37 deletions iwf/workflow_state_options_extension.go

This file was deleted.

0 comments on commit 424afc8

Please sign in to comment.