diff --git a/service/api/service.go b/service/api/service.go index 3a9f9264..d59b8e8c 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -5,6 +5,7 @@ import ( "crypto/md5" "encoding/json" "fmt" + "github.com/indeedeng/iwf/service/interpreter/versions" "math" "net/http" "os" @@ -61,6 +62,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( WorkflowExecutionTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second, SearchAttributes: map[string]interface{}{ service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType, + service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions, }, } diff --git a/service/interpreter/cadence/workflowProvider.go b/service/interpreter/cadence/workflowProvider.go index c4085157..0cd69f3a 100644 --- a/service/interpreter/cadence/workflowProvider.go +++ b/service/interpreter/cadence/workflowProvider.go @@ -2,6 +2,7 @@ package cadence import ( "fmt" + "github.com/indeedeng/iwf/service/common/mapper" "time" "github.com/indeedeng/iwf/gen/iwfidl" @@ -36,7 +37,9 @@ func (w *workflowProvider) IsApplicationError(err error) bool { return ok } -func (w *workflowProvider) NewInterpreterContinueAsNewError(ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput) error { +func (w *workflowProvider) NewInterpreterContinueAsNewError( + ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput, +) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -44,7 +47,9 @@ func (w *workflowProvider) NewInterpreterContinueAsNewError(ctx interpreter.Unif return workflow.NewContinueAsNewError(wfCtx, Interpreter, input) } -func (w *workflowProvider) UpsertSearchAttributes(ctx interpreter.UnifiedContext, attributes map[string]interface{}) error { +func (w *workflowProvider) UpsertSearchAttributes( + ctx interpreter.UnifiedContext, attributes map[string]interface{}, +) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -83,7 +88,21 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter } } -func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, queryType string, handler interface{}) error { +func (w *workflowProvider) GetSearchAttributes( + ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, +) (map[string]iwfidl.SearchAttribute, error) { + wfCtx, ok := ctx.GetContext().(workflow.Context) + if !ok { + panic("cannot convert to cadence workflow context") + } + sas := workflow.GetInfo(wfCtx).SearchAttributes + + return mapper.MapCadenceToIwfSearchAttributes(sas, requestedSearchAttributes) +} + +func (w *workflowProvider) SetQueryHandler( + ctx interpreter.UnifiedContext, queryType string, handler interface{}, +) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -92,13 +111,16 @@ func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, query } func (w *workflowProvider) SetRpcUpdateHandler( - ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler, + ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, + handler interpreter.UnifiedRpcHandler, ) error { // NOTE: this feature is not available in Cadence return nil } -func (w *workflowProvider) ExtendContextWithValue(parent interpreter.UnifiedContext, key string, val interface{}) interpreter.UnifiedContext { +func (w *workflowProvider) ExtendContextWithValue( + parent interpreter.UnifiedContext, key string, val interface{}, +) interpreter.UnifiedContext { wfCtx, ok := parent.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -106,7 +128,9 @@ func (w *workflowProvider) ExtendContextWithValue(parent interpreter.UnifiedCont return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) } -func (w *workflowProvider) GoNamed(ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext)) { +func (w *workflowProvider) GoNamed( + ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext), +) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -141,7 +165,9 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func( return workflow.Await(wfCtx, condition) } -func (w *workflowProvider) WithActivityOptions(ctx interpreter.UnifiedContext, options interpreter.ActivityOptions) interpreter.UnifiedContext { +func (w *workflowProvider) WithActivityOptions( + ctx interpreter.UnifiedContext, options interpreter.ActivityOptions, +) interpreter.UnifiedContext { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -175,7 +201,9 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e return t.future.Get(wfCtx, valuePtr) } -func (w *workflowProvider) ExecuteActivity(ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}) (future interpreter.Future) { +func (w *workflowProvider) ExecuteActivity( + ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, +) (future interpreter.Future) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -210,7 +238,9 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration return workflow.Sleep(wfCtx, d) } -func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int { +func (w *workflowProvider) GetVersion( + ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int, +) int { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -237,7 +267,9 @@ func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, return t.channel.Receive(wfCtx, valuePtr) } -func (w *workflowProvider) GetSignalChannel(ctx interpreter.UnifiedContext, signalName string) interpreter.ReceiveChannel { +func (w *workflowProvider) GetSignalChannel( + ctx interpreter.UnifiedContext, signalName string, +) interpreter.ReceiveChannel { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") diff --git a/service/interpreter/globalVersioner.go b/service/interpreter/globalVersioner.go index 46c621ab..f4801a4d 100644 --- a/service/interpreter/globalVersioner.go +++ b/service/interpreter/globalVersioner.go @@ -1,53 +1,78 @@ package interpreter -import "github.com/indeedeng/iwf/service" +import ( + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/versions" +) const globalChangeId = "global" -const startingVersionUsingGlobalVersioning = 1 -const startingVersionOptimizedUpsertSearchAttribute = 2 -const startingVersionRenamedStateApi = 3 -const continueAsNewOnNoStates = 4 -const maxOfAllVersions = continueAsNewOnNoStates // GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api type GlobalVersioner struct { workflowProvider WorkflowProvider ctx UnifiedContext + version int + isFromStart bool // indicate the version is set when starting the workflow } -func NewGlobalVersioner(workflowProvider WorkflowProvider, ctx UnifiedContext) *GlobalVersioner { +func NewGlobalVersioner(workflowProvider WorkflowProvider, ctx UnifiedContext) (*GlobalVersioner, error) { + sas, err := workflowProvider.GetSearchAttributes(ctx, []iwfidl.SearchAttributeKeyAndType{ + {Key: ptr.Any(service.SearchAttributeGlobalVersion), + ValueType: ptr.Any(iwfidl.INT)}, + }) + if err != nil { + return nil, err + } + version := 0 + isFromStart := false + if len(sas) == 0 { + version = workflowProvider.GetVersion(ctx, globalChangeId, 0, versions.MaxOfAllVersions) + } else { + // TODO: future improvement https://github.com/indeedeng/iwf/issues/369 + attribute, ok := sas[service.SearchAttributeGlobalVersion] + if !ok { + panic("search attribute global version is not found") + } + version = int(attribute.GetIntegerValue()) + isFromStart = true + } + return &GlobalVersioner{ workflowProvider: workflowProvider, ctx: ctx, - } + version: version, + isFromStart: isFromStart, + }, nil } func (p *GlobalVersioner) IsAfterVersionOfContinueAsNewOnNoStates() bool { - version := p.workflowProvider.GetVersion(p.ctx, globalChangeId, 0, maxOfAllVersions) - return version >= continueAsNewOnNoStates + return p.version >= versions.StartingVersionContinueAsNewOnNoStates } func (p *GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning() bool { - version := p.workflowProvider.GetVersion(p.ctx, globalChangeId, 0, maxOfAllVersions) - return version >= startingVersionUsingGlobalVersioning + return p.version >= versions.StartingVersionUsingGlobalVersioning } func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool { - version := p.workflowProvider.GetVersion(p.ctx, globalChangeId, 0, maxOfAllVersions) - return version >= startingVersionOptimizedUpsertSearchAttribute + return p.version >= versions.StartingVersionOptimizedUpsertSearchAttribute } func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool { - version := p.workflowProvider.GetVersion(p.ctx, globalChangeId, 0, maxOfAllVersions) - return version >= startingVersionRenamedStateApi + return p.version >= versions.StartingVersionRenamedStateApi } func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error { + if p.isFromStart { + // the search attribute is already set when starting the workflow + return nil + } // TODO this bug in Cadence SDK may cause concurrent writes // https://github.com/uber-go/cadence-client/issues/1198 if p.workflowProvider.GetBackendType() != service.BackendTypeCadence { return p.workflowProvider.UpsertSearchAttributes(p.ctx, map[string]interface{}{ - service.SearchAttributeGlobalVersion: maxOfAllVersions, + service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions, }) } return nil diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces.go index aae24af9..d6328643 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces.go @@ -84,10 +84,15 @@ type WorkflowProvider interface { NewApplicationError(errType string, details interface{}) error IsApplicationError(err error) bool GetWorkflowInfo(ctx UnifiedContext) WorkflowInfo + GetSearchAttributes( + ctx UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, + ) (map[string]iwfidl.SearchAttribute, error) UpsertSearchAttributes(ctx UnifiedContext, attributes map[string]interface{}) error UpsertMemo(ctx UnifiedContext, memo map[string]iwfidl.EncodedObject) error SetQueryHandler(ctx UnifiedContext, queryType string, handler interface{}) error - SetRpcUpdateHandler(ctx UnifiedContext, updateType string, validator UnifiedRpcValidator, handler UnifiedRpcHandler) error + SetRpcUpdateHandler( + ctx UnifiedContext, updateType string, validator UnifiedRpcValidator, handler UnifiedRpcHandler, + ) error ExtendContextWithValue(parent UnifiedContext, key string, val interface{}) UnifiedContext GoNamed(ctx UnifiedContext, name string, f func(ctx UnifiedContext)) GetThreadCount() int diff --git a/service/interpreter/stateExecutionCounter.go b/service/interpreter/stateExecutionCounter.go index 8a3bc5b1..3b728ff0 100644 --- a/service/interpreter/stateExecutionCounter.go +++ b/service/interpreter/stateExecutionCounter.go @@ -20,7 +20,10 @@ type StateExecutionCounter struct { totalCurrentlyExecutingCount int // For "dead ends": count the total pending states } -func NewStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter) *StateExecutionCounter { +func NewStateExecutionCounter( + ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, + configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, +) *StateExecutionCounter { return &StateExecutionCounter{ ctx: ctx, provider: provider, @@ -28,13 +31,15 @@ func NewStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, con stateIdCurrentlyExecutingCounts: make(map[string]int), totalCurrentlyExecutingCount: 0, configer: configer, - globalVersioner: NewGlobalVersioner(provider, ctx), + globalVersioner: globalVersioner, continueAsNewCounter: continueAsNewCounter, } } -func RebuildStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, - stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, totalCurrentlyExecutingCount int, +func RebuildStateExecutionCounter( + ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, + stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, + totalCurrentlyExecutingCount int, configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ @@ -44,7 +49,7 @@ func RebuildStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, stateIdCurrentlyExecutingCounts: stateIdCurrentlyExecutingCounts, totalCurrentlyExecutingCount: totalCurrentlyExecutingCount, configer: configer, - globalVersioner: NewGlobalVersioner(provider, ctx), + globalVersioner: globalVersioner, continueAsNewCounter: continueAsNewCounter, } } diff --git a/service/interpreter/temporal/workflowProvider.go b/service/interpreter/temporal/workflowProvider.go index 160e4695..890596f9 100644 --- a/service/interpreter/temporal/workflowProvider.go +++ b/service/interpreter/temporal/workflowProvider.go @@ -2,6 +2,7 @@ package temporal import ( "errors" + "github.com/indeedeng/iwf/service/common/mapper" "time" "github.com/indeedeng/iwf/gen/iwfidl" @@ -37,7 +38,9 @@ func (w *workflowProvider) IsApplicationError(err error) bool { return errors.As(err, &applicationError) } -func (w *workflowProvider) NewInterpreterContinueAsNewError(ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput) error { +func (w *workflowProvider) NewInterpreterContinueAsNewError( + ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput, +) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -45,7 +48,9 @@ func (w *workflowProvider) NewInterpreterContinueAsNewError(ctx interpreter.Unif return workflow.NewContinueAsNewError(wfCtx, Interpreter, input) } -func (w *workflowProvider) UpsertSearchAttributes(ctx interpreter.UnifiedContext, attributes map[string]interface{}) error { +func (w *workflowProvider) UpsertSearchAttributes( + ctx interpreter.UnifiedContext, attributes map[string]interface{}, +) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -105,7 +110,21 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter } } -func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, queryType string, handler interface{}) error { +func (w *workflowProvider) GetSearchAttributes( + ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, +) (map[string]iwfidl.SearchAttribute, error) { + wfCtx, ok := ctx.GetContext().(workflow.Context) + if !ok { + panic("cannot convert to temporal workflow context") + } + sas := workflow.GetInfo(wfCtx).SearchAttributes + + return mapper.MapTemporalToIwfSearchAttributes(sas, requestedSearchAttributes) +} + +func (w *workflowProvider) SetQueryHandler( + ctx interpreter.UnifiedContext, queryType string, handler interface{}, +) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -114,7 +133,8 @@ func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, query } func (w *workflowProvider) SetRpcUpdateHandler( - ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler, + ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, + handler interpreter.UnifiedRpcHandler, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -136,7 +156,9 @@ func (w *workflowProvider) SetRpcUpdateHandler( ) } -func (w *workflowProvider) ExtendContextWithValue(parent interpreter.UnifiedContext, key string, val interface{}) interpreter.UnifiedContext { +func (w *workflowProvider) ExtendContextWithValue( + parent interpreter.UnifiedContext, key string, val interface{}, +) interpreter.UnifiedContext { wfCtx, ok := parent.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -144,7 +166,9 @@ func (w *workflowProvider) ExtendContextWithValue(parent interpreter.UnifiedCont return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) } -func (w *workflowProvider) GoNamed(ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext)) { +func (w *workflowProvider) GoNamed( + ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext), +) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -179,7 +203,9 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func( return workflow.Await(wfCtx, condition) } -func (w *workflowProvider) WithActivityOptions(ctx interpreter.UnifiedContext, options interpreter.ActivityOptions) interpreter.UnifiedContext { +func (w *workflowProvider) WithActivityOptions( + ctx interpreter.UnifiedContext, options interpreter.ActivityOptions, +) interpreter.UnifiedContext { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -216,7 +242,9 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e return t.future.Get(wfCtx, valuePtr) } -func (w *workflowProvider) ExecuteActivity(ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}) (future interpreter.Future) { +func (w *workflowProvider) ExecuteActivity( + ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, +) (future interpreter.Future) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -251,7 +279,9 @@ func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { return workflow.IsReplaying(wfCtx) } -func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int { +func (w *workflowProvider) GetVersion( + ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int, +) int { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -278,7 +308,9 @@ func (t *temporalReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, return t.channel.Receive(wfCtx, valuePtr) } -func (w *workflowProvider) GetSignalChannel(ctx interpreter.UnifiedContext, signalName string) interpreter.ReceiveChannel { +func (w *workflowProvider) GetSignalChannel( + ctx interpreter.UnifiedContext, signalName string, +) interpreter.ReceiveChannel { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") diff --git a/service/interpreter/versions/versions.go b/service/interpreter/versions/versions.go new file mode 100644 index 00000000..e120081e --- /dev/null +++ b/service/interpreter/versions/versions.go @@ -0,0 +1,7 @@ +package versions + +const StartingVersionUsingGlobalVersioning = 1 +const StartingVersionOptimizedUpsertSearchAttribute = 2 +const StartingVersionRenamedStateApi = 3 +const StartingVersionContinueAsNewOnNoStates = 4 +const MaxOfAllVersions = StartingVersionContinueAsNewOnNoStates diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 50bfa20e..0ef4e559 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -18,7 +18,11 @@ func InterpreterImpl( ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput, ) (*service.InterpreterWorkflowOutput, error) { var err error - globalVersioner := NewGlobalVersioner(provider, ctx) + globalVersioner, err := NewGlobalVersioner(provider, ctx) + if err != nil { + return nil, err + } + if globalVersioner.IsAfterVersionOfUsingGlobalVersioning() { err = globalVersioner.UpsertGlobalVersionSearchAttribute() if err != nil { @@ -70,7 +74,7 @@ func InterpreterImpl( continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) counterInfo := previous.StateExecutionCounterInfo - stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, + stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, globalVersioner, counterInfo.StateIdStartedCount, counterInfo.StateIdCurrentlyExecutingCount, counterInfo.TotalCurrentlyExecutingCount, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(previous.StateOutputs) @@ -82,7 +86,7 @@ func InterpreterImpl( timerProcessor = NewTimerProcessor(ctx, provider, nil) continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) - stateExecutionCounter = NewStateExecutionCounter(ctx, provider, workflowConfiger, continueAsNewCounter) + stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(nil) continueAsNewer = NewContinueAsNewer(provider, interStateChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor) } @@ -177,7 +181,7 @@ func InterpreterImpl( shouldSendSignalOnCompletion := slices.Contains(input.WaitForCompletionStateExecutionIds, stateExeId) decision, stateExecStatus, err := executeState( - ctx, provider, basicInfo, stateReq, stateExeId, persistenceManager, interStateChannel, + ctx, provider, globalVersioner, basicInfo, stateReq, stateExeId, persistenceManager, interStateChannel, signalReceiver, timerProcessor, continueAsNewer, continueAsNewCounter, shouldSendSignalOnCompletion) if err != nil { // this is the case where stateExecStatus == FailureStateExecutionStatus @@ -428,6 +432,7 @@ func checkClosingWorkflow( func executeState( ctx UnifiedContext, provider WorkflowProvider, + globalVersioner *GlobalVersioner, basicInfo service.BasicInfo, stateReq StateRequest, stateExeId string, @@ -439,7 +444,6 @@ func executeState( continueAsNewCounter *ContinueAsNewCounter, shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { - globalVersioner := NewGlobalVersioner(provider, ctx) waitUntilApi := StateStart executeApi := StateDecide if globalVersioner.IsAfterVersionOfRenamedStateApi() {