diff --git a/integ/start_with_no_options_test.go b/integ/start_with_no_options_test.go new file mode 100644 index 00000000..06d57e89 --- /dev/null +++ b/integ/start_with_no_options_test.go @@ -0,0 +1,77 @@ +package integ + +import ( + "context" + "github.com/indeedeng/iwf/service/common/ptr" + "strconv" + "testing" + "time" + + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/integ/workflow/basic" + "github.com/indeedeng/iwf/service" + "github.com/stretchr/testify/assert" +) + +func TestStartWorkflowNoOptionsTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + doTestStartWorkflowWithoutStartOptions(t, service.BackendTypeTemporal) +} + +func TestStartWorkflowNoOptionsCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + doTestStartWorkflowWithoutStartOptions(t, service.BackendTypeCadence) +} + +func doTestStartWorkflowWithoutStartOptions(t *testing.T, backendType service.BackendType) { + if !*cadenceIntegTest { + t.Skip() + } + + wfHandler := basic.NewHandler() + closeFunc1 := startWorkflowWorker(wfHandler) + defer closeFunc1() + + client, closeFunc2 := startIwfServiceWithClient(backendType) + defer closeFunc2() + + // start a workflow + apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ + Servers: []iwfidl.ServerConfiguration{ + { + URL: "http://localhost:" + testIwfServerPort, + }, + }, + }) + wfId := "TestStartWorkflowWithoutStartOptions" + strconv.Itoa(int(time.Now().UnixNano())) + wfInput := &iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString("test data"), + } + req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) + startReq := iwfidl.WorkflowStartRequest{ + WorkflowId: wfId, + IwfWorkflowType: basic.WorkflowType, + WorkflowTimeoutSeconds: 100, + IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, + StartStateId: ptr.Any(basic.State1), + StateInput: wfInput, + } + _, httpResp, err := req.WorkflowStartRequest(startReq).Execute() + panicAtHttpError(err, httpResp) + + requestedSAs := []iwfidl.SearchAttributeKeyAndType{ + { + Key: ptr.Any(service.SearchAttributeIwfWorkflowType), + ValueType: iwfidl.KEYWORD.Ptr(), + }, + } + response, err := client.DescribeWorkflowExecution(context.Background(), wfId, "", requestedSAs) + assertions := assert.New(t) + attribute := response.SearchAttributes[service.SearchAttributeIwfWorkflowType] + assertions.Equal(basic.WorkflowType, attribute.GetStringValue()) +} diff --git a/service/api/service.go b/service/api/service.go index c41686a0..76692f1d 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/indeedeng/iwf/service/common/compatibility" "github.com/indeedeng/iwf/service/common/urlautofix" + "github.com/indeedeng/iwf/service/common/utils" "io/ioutil" "math" "net/http" @@ -80,9 +81,12 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor ID: req.GetWorkflowId(), TaskQueue: s.taskQueue, WorkflowExecutionTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second, + SearchAttributes: map[string]interface{}{ + service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType, + }, } - var initSAs []iwfidl.SearchAttribute + var initCustomSAs []iwfidl.SearchAttribute workflowConfig := s.config.Interpreter.DefaultWorkflowConfig useMemo := false @@ -92,12 +96,13 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor workflowOptions.CronSchedule = startOptions.CronSchedule workflowOptions.RetryPolicy = startOptions.RetryPolicy var err error - workflowOptions.SearchAttributes, err = mapper.MapToInternalSearchAttributes(startOptions.SearchAttributes) + initialCustomSAInternal, err := mapper.MapToInternalSearchAttributes(startOptions.SearchAttributes) if err != nil { return nil, s.handleError(err) } - workflowOptions.SearchAttributes[service.SearchAttributeIwfWorkflowType] = req.IwfWorkflowType - initSAs = startOptions.SearchAttributes + workflowOptions.SearchAttributes = utils.MergeMap(initialCustomSAInternal, workflowOptions.SearchAttributes) + + initCustomSAs = startOptions.SearchAttributes if startOptions.HasWorkflowConfigOverride() { workflowConfig = startOptions.GetWorkflowConfigOverride() } @@ -117,7 +122,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor StartStateId: req.StartStateId, StateInput: req.GetStateInput(), StateOptions: req.GetStateOptions(), - InitSearchAttributes: initSAs, + InitSearchAttributes: initCustomSAs, Config: workflowConfig, UseMemoForDataAttributes: useMemo, } diff --git a/service/common/utils/utils.go b/service/common/utils/utils.go new file mode 100644 index 00000000..22926bf7 --- /dev/null +++ b/service/common/utils/utils.go @@ -0,0 +1,13 @@ +package utils + +func MergeMap(first map[string]interface{}, second map[string]interface{}) map[string]interface{} { + out := make(map[string]interface{}, len(first)) + for k, v := range first { + out[k] = v + } + + for k, v := range second { + out[k] = v + } + return out +}