Skip to content

Commit

Permalink
Support Conditional close signal channel empty (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Jul 5, 2023
1 parent b0db228 commit 24d444f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 25 deletions.
49 changes: 39 additions & 10 deletions integ/conditional_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowCadenceContinueAs
}

func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, false)
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, true)
}

func doTestConditionalForceCompleteOnChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool) {
assertions := assert.New(t)
// start test workflow server
wfHandler := conditionalClose.NewHandler()
Expand All @@ -72,9 +77,13 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
})

// start a workflow
wfId := conditionalClose.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
channelType := "_internal_channel_"
if useSignalChannel {
channelType = "_signal_channel_"
}
wfId := conditionalClose.WorkflowType + channelType + strconv.Itoa(int(time.Now().UnixNano()))
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: conditionalClose.WorkflowType,
WorkflowTimeoutSeconds: 20,
Expand All @@ -83,19 +92,35 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
},
}).Execute()
}
if useSignalChannel {
startReq.StateInput = &iwfidl.EncodedObject{
Data: iwfidl.PtrString("use-signal-channel"),
} // this will tell the workflow to use signal
}

_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

// wait for a second so that query handler is ready for executing PRC
time.Sleep(time.Second)
// invoke RPC to send 1 messages to the internal channel to unblock the waitUntil
// then send another two messages
reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
reqSignal := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
for i := 0; i < 3; i++ {
_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
WorkflowId: wfId,
RpcName: conditionalClose.RpcPublishInternalChannel,
}).Execute()
if useSignalChannel {
httpResp, err = reqSignal.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: conditionalClose.TestChannelName,
}).Execute()
} else {
_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
WorkflowId: wfId,
RpcName: conditionalClose.RpcPublishInternalChannel,
}).Execute()
}

panicAtHttpError(err, httpResp)
if i == 0 {
// wait for a second so that the workflow is in execute state
Expand All @@ -111,11 +136,15 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
panicAtHttpError(err, httpResp)

history, _ := wfHandler.GetTestResult()
assertions.Equalf(map[string]int64{

expectMap := map[string]int64{
"S1_start": 3,
"S1_decide": 3,
conditionalClose.RpcPublishInternalChannel: 3,
}, history, "rpc test fail, %v", history)
}
if !useSignalChannel {
expectMap[conditionalClose.RpcPublishInternalChannel] = 3
}
assertions.Equalf(expectMap, history, "rpc test fail, %v", history)

assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus())
assertions.Equal(1, len(resp2.GetResults()))
Expand Down
51 changes: 38 additions & 13 deletions integ/workflow/conditional_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
WorkflowType = "conditional_close"
RpcPublishInternalChannel = "publish_internal_channel"

InternalChannelName = "test-channel-name"
TestChannelName = "test-channel-name"

State1 = "S1"
)
Expand Down Expand Up @@ -48,7 +48,7 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) {
c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{
PublishToInterStateChannel: []iwfidl.InterStateChannelPublishing{
{
ChannelName: InternalChannelName,
ChannelName: TestChannelName,
},
},
})
Expand All @@ -67,15 +67,28 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if req.GetWorkflowStateId() == State1 {

c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
InterStateChannelCommands: []iwfidl.InterStateChannelCommand{
cmdReq := &iwfidl.CommandRequest{
InterStateChannelCommands: []iwfidl.InterStateChannelCommand{
{
ChannelName: TestChannelName,
},
},
CommandWaitingType: ptr.Any(iwfidl.ANY_COMPLETED),
}
input := req.GetStateInput()
if input.GetData() == "use-signal-channel" {
// use signal
cmdReq = &iwfidl.CommandRequest{
SignalCommands: []iwfidl.SignalCommand{
{
ChannelName: InternalChannelName,
SignalChannelName: TestChannelName,
},
},
CommandWaitingType: ptr.Any(iwfidl.ANY_COMPLETED),
},
}
}
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: cmdReq,
})
return
}
Expand All @@ -102,18 +115,30 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
time.Sleep(time.Second * 3)
}

conditionalClose := &iwfidl.WorkflowConditionalClose{
ConditionalCloseType: iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY.Ptr(),
ChannelName: iwfidl.PtrString(TestChannelName),
CloseInput: &TestInput,
}
input := req.GetStateInput()
if input.GetData() == "use-signal-channel" {
// use signal
conditionalClose = &iwfidl.WorkflowConditionalClose{
ConditionalCloseType: iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY.Ptr(),
ChannelName: iwfidl.PtrString(TestChannelName),
CloseInput: &TestInput,
}
}

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State1,
StateId: State1,
StateInput: req.StateInput,
},
},
ConditionalClose: &iwfidl.WorkflowConditionalClose{
ConditionalCloseType: iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY.Ptr(),
ChannelName: iwfidl.PtrString(InternalChannelName),
CloseInput: &TestInput,
},
ConditionalClose: conditionalClose,
},
})
return
Expand Down
15 changes: 13 additions & 2 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func checkClosingWorkflow(
) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) {
if decision.HasConditionalClose() {
conditionClose := decision.ConditionalClose
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY {
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY ||
conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY {
// trigger a signal draining so that all the signal/internal channel messages are processed
// TODO https://github.com/indeedeng/iwf/issues/289
// https://github.com/indeedeng/iwf/issues/290
Expand All @@ -290,7 +291,17 @@ func checkClosingWorkflow(
// to workaround, user code will have to use persistence locking
signalReceiver.DrainAllUnreceivedSignals(ctx)

if !internalChannel.HasData(conditionClose.GetChannelName()) {
conditionMet := false
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY &&
!internalChannel.HasData(conditionClose.GetChannelName()) {
conditionMet = true
}
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY &&
!signalReceiver.HasSignal(conditionClose.GetChannelName()) {
conditionMet = true
}

if conditionMet {
// condition is met, force complete the workflow
forceComplete = true
completeOutput = &iwfidl.StateCompletionOutput{
Expand Down

0 comments on commit 24d444f

Please sign in to comment.