Skip to content

Commit

Permalink
Fix bug of missing IwfWorkflowType system search attribute when no wo…
Browse files Browse the repository at this point in the history
…rkflowStartOptions (#309)
  • Loading branch information
longquanzheng committed Jul 27, 2023
1 parent 34052f7 commit aae2109
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 5 deletions.
77 changes: 77 additions & 0 deletions integ/start_with_no_options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package integ

import (
"context"
"github.com/indeedeng/iwf/service/common/ptr"
"strconv"
"testing"
"time"

"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/basic"
"github.com/indeedeng/iwf/service"
"github.com/stretchr/testify/assert"
)

func TestStartWorkflowNoOptionsTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
doTestStartWorkflowWithoutStartOptions(t, service.BackendTypeTemporal)
}

func TestStartWorkflowNoOptionsCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
doTestStartWorkflowWithoutStartOptions(t, service.BackendTypeCadence)
}

func doTestStartWorkflowWithoutStartOptions(t *testing.T, backendType service.BackendType) {
if !*cadenceIntegTest {
t.Skip()
}

wfHandler := basic.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

client, closeFunc2 := startIwfServiceWithClient(backendType)
defer closeFunc2()

// start a workflow
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
wfId := "TestStartWorkflowWithoutStartOptions" + strconv.Itoa(int(time.Now().UnixNano()))
wfInput := &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString("test data"),
}
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: basic.WorkflowType,
WorkflowTimeoutSeconds: 100,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(basic.State1),
StateInput: wfInput,
}
_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

requestedSAs := []iwfidl.SearchAttributeKeyAndType{
{
Key: ptr.Any(service.SearchAttributeIwfWorkflowType),
ValueType: iwfidl.KEYWORD.Ptr(),
},
}
response, err := client.DescribeWorkflowExecution(context.Background(), wfId, "", requestedSAs)
assertions := assert.New(t)
attribute := response.SearchAttributes[service.SearchAttributeIwfWorkflowType]
assertions.Equal(basic.WorkflowType, attribute.GetStringValue())
}
15 changes: 10 additions & 5 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"github.com/indeedeng/iwf/service/common/compatibility"
"github.com/indeedeng/iwf/service/common/urlautofix"
"github.com/indeedeng/iwf/service/common/utils"
"io/ioutil"
"math"
"net/http"
Expand Down Expand Up @@ -80,9 +81,12 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor
ID: req.GetWorkflowId(),
TaskQueue: s.taskQueue,
WorkflowExecutionTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second,
SearchAttributes: map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
},
}

var initSAs []iwfidl.SearchAttribute
var initCustomSAs []iwfidl.SearchAttribute
workflowConfig := s.config.Interpreter.DefaultWorkflowConfig

useMemo := false
Expand All @@ -92,12 +96,13 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor
workflowOptions.CronSchedule = startOptions.CronSchedule
workflowOptions.RetryPolicy = startOptions.RetryPolicy
var err error
workflowOptions.SearchAttributes, err = mapper.MapToInternalSearchAttributes(startOptions.SearchAttributes)
initialCustomSAInternal, err := mapper.MapToInternalSearchAttributes(startOptions.SearchAttributes)
if err != nil {
return nil, s.handleError(err)
}
workflowOptions.SearchAttributes[service.SearchAttributeIwfWorkflowType] = req.IwfWorkflowType
initSAs = startOptions.SearchAttributes
workflowOptions.SearchAttributes = utils.MergeMap(initialCustomSAInternal, workflowOptions.SearchAttributes)

initCustomSAs = startOptions.SearchAttributes
if startOptions.HasWorkflowConfigOverride() {
workflowConfig = startOptions.GetWorkflowConfigOverride()
}
Expand All @@ -117,7 +122,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor
StartStateId: req.StartStateId,
StateInput: req.GetStateInput(),
StateOptions: req.GetStateOptions(),
InitSearchAttributes: initSAs,
InitSearchAttributes: initCustomSAs,
Config: workflowConfig,
UseMemoForDataAttributes: useMemo,
}
Expand Down
13 changes: 13 additions & 0 deletions service/common/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package utils

func MergeMap(first map[string]interface{}, second map[string]interface{}) map[string]interface{} {
out := make(map[string]interface{}, len(first))
for k, v := range first {
out[k] = v
}

for k, v := range second {
out[k] = v
}
return out
}

0 comments on commit aae2109

Please sign in to comment.