Skip to content

Commit

Permalink
Optimize versioning actions by sending via request (#368)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Feb 23, 2024
1 parent bc0b000 commit 06b9438
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 48 deletions.
2 changes: 2 additions & 0 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/md5"
"encoding/json"
"fmt"
"github.com/indeedeng/iwf/service/interpreter/versions"
"math"
"net/http"
"os"
Expand Down Expand Up @@ -61,6 +62,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
WorkflowExecutionTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second,
SearchAttributes: map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions,
},
}

Expand Down
52 changes: 42 additions & 10 deletions service/interpreter/cadence/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cadence

import (
"fmt"
"github.com/indeedeng/iwf/service/common/mapper"
"time"

"github.com/indeedeng/iwf/gen/iwfidl"
Expand Down Expand Up @@ -36,15 +37,19 @@ func (w *workflowProvider) IsApplicationError(err error) bool {
return ok
}

func (w *workflowProvider) NewInterpreterContinueAsNewError(ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput) error {
func (w *workflowProvider) NewInterpreterContinueAsNewError(
ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput,
) error {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
}
return workflow.NewContinueAsNewError(wfCtx, Interpreter, input)
}

func (w *workflowProvider) UpsertSearchAttributes(ctx interpreter.UnifiedContext, attributes map[string]interface{}) error {
func (w *workflowProvider) UpsertSearchAttributes(
ctx interpreter.UnifiedContext, attributes map[string]interface{},
) error {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
Expand Down Expand Up @@ -83,7 +88,21 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter
}
}

func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, queryType string, handler interface{}) error {
func (w *workflowProvider) GetSearchAttributes(
ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType,
) (map[string]iwfidl.SearchAttribute, error) {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
}
sas := workflow.GetInfo(wfCtx).SearchAttributes

return mapper.MapCadenceToIwfSearchAttributes(sas, requestedSearchAttributes)
}

func (w *workflowProvider) SetQueryHandler(
ctx interpreter.UnifiedContext, queryType string, handler interface{},
) error {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
Expand All @@ -92,21 +111,26 @@ func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, query
}

func (w *workflowProvider) SetRpcUpdateHandler(
ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler,
ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator,
handler interpreter.UnifiedRpcHandler,
) error {
// NOTE: this feature is not available in Cadence
return nil
}

func (w *workflowProvider) ExtendContextWithValue(parent interpreter.UnifiedContext, key string, val interface{}) interpreter.UnifiedContext {
func (w *workflowProvider) ExtendContextWithValue(
parent interpreter.UnifiedContext, key string, val interface{},
) interpreter.UnifiedContext {
wfCtx, ok := parent.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
}
return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val))
}

func (w *workflowProvider) GoNamed(ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext)) {
func (w *workflowProvider) GoNamed(
ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext),
) {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
Expand Down Expand Up @@ -141,7 +165,9 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func(
return workflow.Await(wfCtx, condition)
}

func (w *workflowProvider) WithActivityOptions(ctx interpreter.UnifiedContext, options interpreter.ActivityOptions) interpreter.UnifiedContext {
func (w *workflowProvider) WithActivityOptions(
ctx interpreter.UnifiedContext, options interpreter.ActivityOptions,
) interpreter.UnifiedContext {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
Expand Down Expand Up @@ -175,7 +201,9 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e
return t.future.Get(wfCtx, valuePtr)
}

func (w *workflowProvider) ExecuteActivity(ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}) (future interpreter.Future) {
func (w *workflowProvider) ExecuteActivity(
ctx interpreter.UnifiedContext, activity interface{}, args ...interface{},
) (future interpreter.Future) {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
Expand Down Expand Up @@ -210,7 +238,9 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration
return workflow.Sleep(wfCtx, d)
}

func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int {
func (w *workflowProvider) GetVersion(
ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int,
) int {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
Expand All @@ -237,7 +267,9 @@ func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext,
return t.channel.Receive(wfCtx, valuePtr)
}

func (w *workflowProvider) GetSignalChannel(ctx interpreter.UnifiedContext, signalName string) interpreter.ReceiveChannel {
func (w *workflowProvider) GetSignalChannel(
ctx interpreter.UnifiedContext, signalName string,
) interpreter.ReceiveChannel {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
Expand Down
59 changes: 42 additions & 17 deletions service/interpreter/globalVersioner.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,78 @@
package interpreter

import "github.com/indeedeng/iwf/service"
import (
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/interpreter/versions"
)

const globalChangeId = "global"
const startingVersionUsingGlobalVersioning = 1
const startingVersionOptimizedUpsertSearchAttribute = 2
const startingVersionRenamedStateApi = 3
const continueAsNewOnNoStates = 4
const maxOfAllVersions = continueAsNewOnNoStates

// GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api
type GlobalVersioner struct {
workflowProvider WorkflowProvider
ctx UnifiedContext
version int
isFromStart bool // indicate the version is set when starting the workflow
}

func NewGlobalVersioner(workflowProvider WorkflowProvider, ctx UnifiedContext) *GlobalVersioner {
func NewGlobalVersioner(workflowProvider WorkflowProvider, ctx UnifiedContext) (*GlobalVersioner, error) {
sas, err := workflowProvider.GetSearchAttributes(ctx, []iwfidl.SearchAttributeKeyAndType{
{Key: ptr.Any(service.SearchAttributeGlobalVersion),
ValueType: ptr.Any(iwfidl.INT)},
})
if err != nil {
return nil, err
}
version := 0
isFromStart := false
if len(sas) == 0 {
version = workflowProvider.GetVersion(ctx, globalChangeId, 0, versions.MaxOfAllVersions)
} else {
// TODO: future improvement https://github.com/indeedeng/iwf/issues/369
attribute, ok := sas[service.SearchAttributeGlobalVersion]
if !ok {
panic("search attribute global version is not found")
}
version = int(attribute.GetIntegerValue())
isFromStart = true
}

return &GlobalVersioner{
workflowProvider: workflowProvider,
ctx: ctx,
}
version: version,
isFromStart: isFromStart,
}, nil
}

func (p *GlobalVersioner) IsAfterVersionOfContinueAsNewOnNoStates() bool {
version := p.workflowProvider.GetVersion(p.ctx, globalChangeId, 0, maxOfAllVersions)
return version >= continueAsNewOnNoStates
return p.version >= versions.StartingVersionContinueAsNewOnNoStates
}

func (p *GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning() bool {
version := p.workflowProvider.GetVersion(p.ctx, globalChangeId, 0, maxOfAllVersions)
return version >= startingVersionUsingGlobalVersioning
return p.version >= versions.StartingVersionUsingGlobalVersioning
}

func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool {
version := p.workflowProvider.GetVersion(p.ctx, globalChangeId, 0, maxOfAllVersions)
return version >= startingVersionOptimizedUpsertSearchAttribute
return p.version >= versions.StartingVersionOptimizedUpsertSearchAttribute
}

func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool {
version := p.workflowProvider.GetVersion(p.ctx, globalChangeId, 0, maxOfAllVersions)
return version >= startingVersionRenamedStateApi
return p.version >= versions.StartingVersionRenamedStateApi
}

func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error {
if p.isFromStart {
// the search attribute is already set when starting the workflow
return nil
}
// TODO this bug in Cadence SDK may cause concurrent writes
// https://github.com/uber-go/cadence-client/issues/1198
if p.workflowProvider.GetBackendType() != service.BackendTypeCadence {
return p.workflowProvider.UpsertSearchAttributes(p.ctx, map[string]interface{}{
service.SearchAttributeGlobalVersion: maxOfAllVersions,
service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions,
})
}
return nil
Expand Down
7 changes: 6 additions & 1 deletion service/interpreter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,15 @@ type WorkflowProvider interface {
NewApplicationError(errType string, details interface{}) error
IsApplicationError(err error) bool
GetWorkflowInfo(ctx UnifiedContext) WorkflowInfo
GetSearchAttributes(
ctx UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType,
) (map[string]iwfidl.SearchAttribute, error)
UpsertSearchAttributes(ctx UnifiedContext, attributes map[string]interface{}) error
UpsertMemo(ctx UnifiedContext, memo map[string]iwfidl.EncodedObject) error
SetQueryHandler(ctx UnifiedContext, queryType string, handler interface{}) error
SetRpcUpdateHandler(ctx UnifiedContext, updateType string, validator UnifiedRpcValidator, handler UnifiedRpcHandler) error
SetRpcUpdateHandler(
ctx UnifiedContext, updateType string, validator UnifiedRpcValidator, handler UnifiedRpcHandler,
) error
ExtendContextWithValue(parent UnifiedContext, key string, val interface{}) UnifiedContext
GoNamed(ctx UnifiedContext, name string, f func(ctx UnifiedContext))
GetThreadCount() int
Expand Down
15 changes: 10 additions & 5 deletions service/interpreter/stateExecutionCounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,26 @@ type StateExecutionCounter struct {
totalCurrentlyExecutingCount int // For "dead ends": count the total pending states
}

func NewStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter) *StateExecutionCounter {
func NewStateExecutionCounter(
ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner,
configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter,
) *StateExecutionCounter {
return &StateExecutionCounter{
ctx: ctx,
provider: provider,
stateIdStartedCounts: make(map[string]int),
stateIdCurrentlyExecutingCounts: make(map[string]int),
totalCurrentlyExecutingCount: 0,
configer: configer,
globalVersioner: NewGlobalVersioner(provider, ctx),
globalVersioner: globalVersioner,
continueAsNewCounter: continueAsNewCounter,
}
}

func RebuildStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider,
stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, totalCurrentlyExecutingCount int,
func RebuildStateExecutionCounter(
ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner,
stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int,
totalCurrentlyExecutingCount int,
configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter,
) *StateExecutionCounter {
return &StateExecutionCounter{
Expand All @@ -44,7 +49,7 @@ func RebuildStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider,
stateIdCurrentlyExecutingCounts: stateIdCurrentlyExecutingCounts,
totalCurrentlyExecutingCount: totalCurrentlyExecutingCount,
configer: configer,
globalVersioner: NewGlobalVersioner(provider, ctx),
globalVersioner: globalVersioner,
continueAsNewCounter: continueAsNewCounter,
}
}
Expand Down
Loading

0 comments on commit 06b9438

Please sign in to comment.