Skip to content

Commit

Permalink
Done all
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Jul 5, 2023
1 parent 55c744e commit 920991e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 13 deletions.
47 changes: 37 additions & 10 deletions integ/conditional_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowCadenceContinueAs
}

func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, false)
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, true)
}

func doTestConditionalForceCompleteOnChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool) {
assertions := assert.New(t)
// start test workflow server
wfHandler := conditionalClose.NewHandler()
Expand All @@ -72,9 +77,13 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
})

// start a workflow
wfId := conditionalClose.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
channelType := "_internal_channel_"
if useSignalChannel {
channelType = "_signal_channel_"
}
wfId := conditionalClose.WorkflowType + channelType + strconv.Itoa(int(time.Now().UnixNano()))
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: conditionalClose.WorkflowType,
WorkflowTimeoutSeconds: 20,
Expand All @@ -83,19 +92,33 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
},
}).Execute()
}
if useSignalChannel {
startReq.StateInput = &iwfidl.EncodedObject{} // this will tell the workflow to use signal
}

_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

// wait for a second so that query handler is ready for executing PRC
time.Sleep(time.Second)
// invoke RPC to send 1 messages to the internal channel to unblock the waitUntil
// then send another two messages
reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
reqSignal := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
for i := 0; i < 3; i++ {
_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
WorkflowId: wfId,
RpcName: conditionalClose.RpcPublishInternalChannel,
}).Execute()
if useSignalChannel {
httpResp, err = reqSignal.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: conditionalClose.TestChannelName,
}).Execute()
} else {
_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
WorkflowId: wfId,
RpcName: conditionalClose.RpcPublishInternalChannel,
}).Execute()
}

panicAtHttpError(err, httpResp)
if i == 0 {
// wait for a second so that the workflow is in execute state
Expand All @@ -111,11 +134,15 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
panicAtHttpError(err, httpResp)

history, _ := wfHandler.GetTestResult()
assertions.Equalf(map[string]int64{

expectMap := map[string]int64{
"S1_start": 3,
"S1_decide": 3,
conditionalClose.RpcPublishInternalChannel: 3,
}, history, "rpc test fail, %v", history)
}
if !useSignalChannel {
expectMap[conditionalClose.RpcPublishInternalChannel] = 3
}
assertions.Equalf(expectMap, history, "rpc test fail, %v", history)

assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus())
assertions.Equal(1, len(resp2.GetResults()))
Expand Down
3 changes: 2 additions & 1 deletion integ/workflow/conditional_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State1,
StateId: State1,
StateInput: req.StateInput,
},
},
ConditionalClose: conditionalClose,
Expand Down
15 changes: 13 additions & 2 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func checkClosingWorkflow(
) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) {
if decision.HasConditionalClose() {
conditionClose := decision.ConditionalClose
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY {
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY ||
conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY {
// trigger a signal draining so that all the signal/internal channel messages are processed
// TODO https://github.com/indeedeng/iwf/issues/289
// https://github.com/indeedeng/iwf/issues/290
Expand All @@ -290,7 +291,17 @@ func checkClosingWorkflow(
// to workaround, user code will have to use persistence locking
signalReceiver.DrainAllUnreceivedSignals(ctx)

if !internalChannel.HasData(conditionClose.GetChannelName()) {
conditionMet := false
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY &&
!internalChannel.HasData(conditionClose.GetChannelName()) {
conditionMet = true
}
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY &&
!signalReceiver.HasSignal(conditionClose.GetChannelName()) {
conditionMet = true
}

if conditionMet {
// condition is met, force complete the workflow
forceComplete = true
completeOutput = &iwfidl.StateCompletionOutput{
Expand Down

0 comments on commit 920991e

Please sign in to comment.