diff --git a/integ/timer_test.go b/integ/timer_test.go index 45592b95..e62dd843 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -98,12 +98,12 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * } assertions := assert.New(t) timer2 := &service.TimerInfo{ - CommandId: "timer-cmd-id-2", + CommandId: ptr.Any("timer-cmd-id-2"), FiringUnixTimestampSeconds: nowTimestamp + 86400, Status: service.TimerPending, } timer3 := &service.TimerInfo{ - CommandId: "timer-cmd-id-3", + CommandId: ptr.Any("timer-cmd-id-3"), FiringUnixTimestampSeconds: nowTimestamp + 86400*365, Status: service.TimerPending, } @@ -111,7 +111,7 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * StateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{ "S1-1": { { - CommandId: "timer-cmd-id", + CommandId: ptr.Any("timer-cmd-id"), FiringUnixTimestampSeconds: nowTimestamp + 10, Status: service.TimerPending, }, diff --git a/integ/wait_for_state_completion_test.go b/integ/wait_for_state_completion_test.go index f408ca38..b23e68f5 100644 --- a/integ/wait_for_state_completion_test.go +++ b/integ/wait_for_state_completion_test.go @@ -86,7 +86,7 @@ func doTestWaitForStateCompletion( _, httpResp, err = req1.WorkflowWaitForStateCompletionRequest( iwfidl.WorkflowWaitForStateCompletionRequest{ WorkflowId: wfId, - StateExecutionId: "S1-1", + StateExecutionId: ptr.Any("S1-1"), WaitTimeSeconds: iwfidl.PtrInt32(30), }).Execute() panicAtHttpError(err, httpResp) diff --git a/integ/workflow/any_command_close/routers.go b/integ/workflow/any_command_close/routers.go index 12470aa7..4887850d 100644 --- a/integ/workflow/any_command_close/routers.go +++ b/integ/workflow/any_command_close/routers.go @@ -46,11 +46,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { CommandRequest: &iwfidl.CommandRequest{ SignalCommands: []iwfidl.SignalCommand{ { - CommandId: "signal-cmd-id1", + CommandId: ptr.Any("signal-cmd-id1"), SignalChannelName: SignalName1, }, { - CommandId: "signal-cmd-id2", + CommandId: ptr.Any("signal-cmd-id2"), SignalChannelName: SignalName2, }, }, diff --git a/integ/workflow/any_command_combination/routers.go b/integ/workflow/any_command_combination/routers.go index d40dab4a..bf0af376 100644 --- a/integ/workflow/any_command_combination/routers.go +++ b/integ/workflow/any_command_combination/routers.go @@ -5,6 +5,7 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/integ/workflow/common" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" "time" @@ -48,41 +49,39 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { invalidTimerCommands := []iwfidl.TimerCommand{ { - CommandId: "", FiringUnixTimestampSeconds: iwfidl.PtrInt64(time.Now().Unix() + 86400*365), // one year later }, } validTimerCommands := []iwfidl.TimerCommand{ { - CommandId: TimerId1, + CommandId: ptr.Any(TimerId1), FiringUnixTimestampSeconds: iwfidl.PtrInt64(time.Now().Unix() + 86400*365), // one year later }, } invalidSignalCommands := []iwfidl.SignalCommand{ { - CommandId: "", SignalChannelName: SignalNameAndId1, }, { - CommandId: SignalNameAndId2, + CommandId: ptr.Any(SignalNameAndId2), SignalChannelName: SignalNameAndId2, }, } validSignalCommands := []iwfidl.SignalCommand{ { - CommandId: SignalNameAndId1, + CommandId: ptr.Any(SignalNameAndId1), SignalChannelName: SignalNameAndId1, }, { - CommandId: SignalNameAndId1, + CommandId: ptr.Any(SignalNameAndId1), SignalChannelName: SignalNameAndId1, }, { - CommandId: SignalNameAndId2, + CommandId: ptr.Any(SignalNameAndId2), SignalChannelName: SignalNameAndId2, }, { - CommandId: SignalNameAndId3, + CommandId: ptr.Any(SignalNameAndId3), SignalChannelName: SignalNameAndId3, }, } diff --git a/integ/workflow/any_timer_signal/routers.go b/integ/workflow/any_timer_signal/routers.go index 1a66dd10..e0be00aa 100644 --- a/integ/workflow/any_timer_signal/routers.go +++ b/integ/workflow/any_timer_signal/routers.go @@ -57,7 +57,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { CommandRequest: &iwfidl.CommandRequest{ SignalCommands: []iwfidl.SignalCommand{ { - CommandId: "signal-cmd-id", + CommandId: ptr.Any("signal-cmd-id"), SignalChannelName: SignalName, }, }, diff --git a/integ/workflow/interstate/routers.go b/integ/workflow/interstate/routers.go index 2331c9bb..3986a3c5 100644 --- a/integ/workflow/interstate/routers.go +++ b/integ/workflow/interstate/routers.go @@ -4,6 +4,7 @@ import ( "github.com/gin-gonic/gin" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" "time" @@ -68,7 +69,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), InterStateChannelCommands: []iwfidl.InterStateChannelCommand{ { - CommandId: "cmd-1", + CommandId: ptr.Any("cmd-1"), ChannelName: channel1, }, }, @@ -82,7 +83,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), InterStateChannelCommands: []iwfidl.InterStateChannelCommand{ { - CommandId: "cmd-2", + CommandId: ptr.Any("cmd-2"), ChannelName: channel2, }, }, diff --git a/integ/workflow/signal/routers.go b/integ/workflow/signal/routers.go index 0f703d58..9f082ecf 100644 --- a/integ/workflow/signal/routers.go +++ b/integ/workflow/signal/routers.go @@ -6,6 +6,7 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/integ/workflow/common" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" ) @@ -46,19 +47,17 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { CommandRequest: &iwfidl.CommandRequest{ SignalCommands: []iwfidl.SignalCommand{ { - CommandId: "signal-cmd-id0", + CommandId: ptr.Any("signal-cmd-id0"), SignalChannelName: SignalName, }, { - CommandId: "signal-cmd-id1", + CommandId: ptr.Any("signal-cmd-id1"), SignalChannelName: SignalName, }, { - CommandId: "", SignalChannelName: SignalName, }, { - CommandId: "", SignalChannelName: SignalName, }, }, diff --git a/integ/workflow/timer/routers.go b/integ/workflow/timer/routers.go index 41c5b6ce..62553c00 100644 --- a/integ/workflow/timer/routers.go +++ b/integ/workflow/timer/routers.go @@ -1,6 +1,7 @@ package timer import ( + "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" "strconv" @@ -52,15 +53,15 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { CommandRequest: &iwfidl.CommandRequest{ TimerCommands: []iwfidl.TimerCommand{ { - CommandId: "timer-cmd-id", + CommandId: ptr.Any("timer-cmd-id"), DurationSeconds: iwfidl.PtrInt64(10), // fire after 10s }, { - CommandId: "timer-cmd-id-2", + CommandId: ptr.Any("timer-cmd-id-2"), DurationSeconds: iwfidl.PtrInt64(86400), // fire after one day }, { - CommandId: "timer-cmd-id-3", + CommandId: ptr.Any("timer-cmd-id-3"), DurationSeconds: iwfidl.PtrInt64(86400 * 365), // fire after one year }, }, diff --git a/integ/workflow/wait_for_state_completion/routers.go b/integ/workflow/wait_for_state_completion/routers.go index 9a8c246b..512c9885 100644 --- a/integ/workflow/wait_for_state_completion/routers.go +++ b/integ/workflow/wait_for_state_completion/routers.go @@ -1,6 +1,7 @@ package wait_for_state_completion import ( + "github.com/indeedeng/iwf/service/common/ptr" "log" "net/http" "strconv" @@ -52,7 +53,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { CommandRequest: &iwfidl.CommandRequest{ TimerCommands: []iwfidl.TimerCommand{ { - CommandId: "timer-cmd-id", + CommandId: ptr.Any("timer-cmd-id"), FiringUnixTimestampSeconds: iwfidl.PtrInt64(now + 10), // fire after 10s }, }, diff --git a/service/api/service.go b/service/api/service.go index c3ee94df..ccb3410c 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -151,7 +151,13 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion( ) (wresp *iwfidl.WorkflowWaitForStateCompletionResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() - workflowId := service.IwfSystemConstPrefix + req.WorkflowId + "_" + req.StateExecutionId + var workflowId string + if req.WaitForKey != nil { + workflowId = service.IwfSystemConstPrefix + req.WorkflowId + "_" + *req.WaitForKey + } else { + workflowId = service.IwfSystemConstPrefix + req.WorkflowId + "_" + *req.StateExecutionId + } + options := uclient.StartWorkflowOptions{ ID: workflowId, TaskQueue: s.taskQueue, diff --git a/service/interfaces.go b/service/interfaces.go index 19ffb00b..65acdd12 100644 --- a/service/interfaces.go +++ b/service/interfaces.go @@ -100,7 +100,7 @@ type ( } TimerInfo struct { - CommandId string + CommandId *string FiringUnixTimestampSeconds int64 Status InternalTimerStatus } @@ -186,7 +186,7 @@ func ValidateTimerSkipRequest( } if timerId != "" { for _, t := range timerInfos { - if t.CommandId == timerId { + if t.CommandId != nil && *t.CommandId == timerId { return t, true } } diff --git a/service/interpreter/deciderTriggerer.go b/service/interpreter/deciderTriggerer.go index f681ae50..fa8f02dd 100644 --- a/service/interpreter/deciderTriggerer.go +++ b/service/interpreter/deciderTriggerer.go @@ -27,15 +27,15 @@ func IsDeciderTriggerConditionMet( var completedCmdIds []string for idx := range completedTimerCmds { cmdId := commandReq.GetTimerCommands()[idx].CommandId - completedCmdIds = append(completedCmdIds, cmdId) + completedCmdIds = append(completedCmdIds, *cmdId) } for idx := range completedSignalCmds { cmdId := commandReq.GetSignalCommands()[idx].CommandId - completedCmdIds = append(completedCmdIds, cmdId) + completedCmdIds = append(completedCmdIds, *cmdId) } for idx := range completedInterStateChannelCmds { cmdId := commandReq.GetInterStateChannelCommands()[idx].CommandId - completedCmdIds = append(completedCmdIds, cmdId) + completedCmdIds = append(completedCmdIds, *cmdId) } for _, acceptedComb := range commandReq.GetCommandCombinations() { diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 6ca5bd8c..c708d672 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -4,6 +4,7 @@ import ( "context" "fmt" uclient "github.com/indeedeng/iwf/service/client" + "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/interpreter/env" "time" @@ -655,7 +656,7 @@ func executeState( status = iwfidl.FIRED } timerResults = append(timerResults, iwfidl.TimerResult{ - CommandId: cmd.GetCommandId(), + CommandId: ptr.Any(cmd.GetCommandId()), TimerStatus: status, }) } @@ -672,7 +673,7 @@ func executeState( } signalResults = append(signalResults, iwfidl.SignalResult{ - CommandId: cmd.GetCommandId(), + CommandId: ptr.Any(cmd.GetCommandId()), SignalChannelName: cmd.GetSignalChannelName(), SignalValue: result, SignalRequestStatus: status,