Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Conditional close signal channel empty #294

Merged
merged 4 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions integ/conditional_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowCadenceContinueAs
}

func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, false)
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, true)
}

func doTestConditionalForceCompleteOnChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool) {
assertions := assert.New(t)
// start test workflow server
wfHandler := conditionalClose.NewHandler()
Expand All @@ -72,9 +77,13 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
})

// start a workflow
wfId := conditionalClose.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
channelType := "_internal_channel_"
if useSignalChannel {
channelType = "_signal_channel_"
}
wfId := conditionalClose.WorkflowType + channelType + strconv.Itoa(int(time.Now().UnixNano()))
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: conditionalClose.WorkflowType,
WorkflowTimeoutSeconds: 20,
Expand All @@ -83,19 +92,35 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
},
}).Execute()
}
if useSignalChannel {
startReq.StateInput = &iwfidl.EncodedObject{
Data: iwfidl.PtrString("use-signal-channel"),
} // this will tell the workflow to use signal
}

_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

// wait for a second so that query handler is ready for executing PRC
time.Sleep(time.Second)
// invoke RPC to send 1 messages to the internal channel to unblock the waitUntil
// then send another two messages
reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
reqSignal := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
for i := 0; i < 3; i++ {
_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
WorkflowId: wfId,
RpcName: conditionalClose.RpcPublishInternalChannel,
}).Execute()
if useSignalChannel {
httpResp, err = reqSignal.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: conditionalClose.TestChannelName,
}).Execute()
} else {
_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
WorkflowId: wfId,
RpcName: conditionalClose.RpcPublishInternalChannel,
}).Execute()
}

panicAtHttpError(err, httpResp)
if i == 0 {
// wait for a second so that the workflow is in execute state
Expand All @@ -111,11 +136,15 @@ func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T,
panicAtHttpError(err, httpResp)

history, _ := wfHandler.GetTestResult()
assertions.Equalf(map[string]int64{

expectMap := map[string]int64{
"S1_start": 3,
"S1_decide": 3,
conditionalClose.RpcPublishInternalChannel: 3,
}, history, "rpc test fail, %v", history)
}
if !useSignalChannel {
expectMap[conditionalClose.RpcPublishInternalChannel] = 3
}
assertions.Equalf(expectMap, history, "rpc test fail, %v", history)

assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus())
assertions.Equal(1, len(resp2.GetResults()))
Expand Down
51 changes: 38 additions & 13 deletions integ/workflow/conditional_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
WorkflowType = "conditional_close"
RpcPublishInternalChannel = "publish_internal_channel"

InternalChannelName = "test-channel-name"
TestChannelName = "test-channel-name"

State1 = "S1"
)
Expand Down Expand Up @@ -48,7 +48,7 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) {
c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{
PublishToInterStateChannel: []iwfidl.InterStateChannelPublishing{
{
ChannelName: InternalChannelName,
ChannelName: TestChannelName,
},
},
})
Expand All @@ -67,15 +67,28 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if req.GetWorkflowStateId() == State1 {

c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
InterStateChannelCommands: []iwfidl.InterStateChannelCommand{
cmdReq := &iwfidl.CommandRequest{
InterStateChannelCommands: []iwfidl.InterStateChannelCommand{
{
ChannelName: TestChannelName,
},
},
CommandWaitingType: ptr.Any(iwfidl.ANY_COMPLETED),
}
input := req.GetStateInput()
if input.GetData() == "use-signal-channel" {
// use signal
cmdReq = &iwfidl.CommandRequest{
SignalCommands: []iwfidl.SignalCommand{
{
ChannelName: InternalChannelName,
SignalChannelName: TestChannelName,
},
},
CommandWaitingType: ptr.Any(iwfidl.ANY_COMPLETED),
},
}
}
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: cmdReq,
})
return
}
Expand All @@ -102,18 +115,30 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
time.Sleep(time.Second * 3)
}

conditionalClose := &iwfidl.WorkflowConditionalClose{
ConditionalCloseType: iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY.Ptr(),
ChannelName: iwfidl.PtrString(TestChannelName),
CloseInput: &TestInput,
}
input := req.GetStateInput()
if input.GetData() == "use-signal-channel" {
// use signal
conditionalClose = &iwfidl.WorkflowConditionalClose{
ConditionalCloseType: iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY.Ptr(),
ChannelName: iwfidl.PtrString(TestChannelName),
CloseInput: &TestInput,
}
}

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State1,
StateId: State1,
StateInput: req.StateInput,
},
},
ConditionalClose: &iwfidl.WorkflowConditionalClose{
ConditionalCloseType: iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY.Ptr(),
ChannelName: iwfidl.PtrString(InternalChannelName),
CloseInput: &TestInput,
},
ConditionalClose: conditionalClose,
},
})
return
Expand Down
15 changes: 13 additions & 2 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func checkClosingWorkflow(
) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) {
if decision.HasConditionalClose() {
conditionClose := decision.ConditionalClose
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY {
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY ||
conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY {
// trigger a signal draining so that all the signal/internal channel messages are processed
// TODO https://github.com/indeedeng/iwf/issues/289
// https://github.com/indeedeng/iwf/issues/290
Expand All @@ -290,7 +291,17 @@ func checkClosingWorkflow(
// to workaround, user code will have to use persistence locking
signalReceiver.DrainAllUnreceivedSignals(ctx)

if !internalChannel.HasData(conditionClose.GetChannelName()) {
conditionMet := false
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY &&
!internalChannel.HasData(conditionClose.GetChannelName()) {
conditionMet = true
}
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY &&
!signalReceiver.HasSignal(conditionClose.GetChannelName()) {
conditionMet = true
}

if conditionMet {
// condition is met, force complete the workflow
forceComplete = true
completeOutput = &iwfidl.StateCompletionOutput{
Expand Down